Skip to content

Commit

Permalink
arrowutils: add TakeList and improve TakeDict performance (#897)
Browse files Browse the repository at this point in the history
This commit adds both a TakeList function to reorder arrays of type List when
sorting records, and improves TakeDict performance by inserting dict values
directly and subsequently indices, to avoid materializing values. This
especially speeds up cases where dictionaries have a low number of distinct
values.

             │  benchmain   │              benchnew               │
             │    sec/op    │   sec/op     vs base                │
Take/Dict-12   107.05µ ± 0%   79.31µ ± 0%  -25.91% (p=0.000 n=10)

             │  benchmain   │              benchnew               │
             │     B/op     │     B/op      vs base               │
Take/Dict-12   133.3Ki ± 0%   133.3Ki ± 0%  +0.00% (p=0.002 n=10)

             │ benchmain  │             benchnew              │
             │ allocs/op  │ allocs/op   vs base               │
Take/Dict-12   64.00 ± 0%   66.00 ± 0%  +3.12% (p=0.000 n=10)
  • Loading branch information
asubiotto authored Jun 10, 2024
1 parent e937f46 commit 316067a
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 45 deletions.
101 changes: 71 additions & 30 deletions pqarrow/arrowutils/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ func SortRecord(r arrow.Record, columns []SortingColumn) (*array.Int32, error) {
//
// Use compute.WithAllocator to pass a custom memory.Allocator.
func Take(ctx context.Context, r arrow.Record, indices *array.Int32) (arrow.Record, error) {
// compute.Take doesn't support dictionaries. Use take on r when r does not have
// dictionary column.
var hasDictionary bool
// compute.Take doesn't support dictionaries or lists. Use take on r when r
// does not have these columns.
var customTake bool
for i := 0; i < int(r.NumCols()); i++ {
if r.Column(i).DataType().ID() == arrow.DICTIONARY {
hasDictionary = true
if r.Column(i).DataType().ID() == arrow.DICTIONARY || r.Column(i).DataType().ID() == arrow.LIST {
customTake = true
break
}
}
if !hasDictionary {
if !customTake {
res, err := compute.Take(
ctx,
compute.TakeOptions{BoundsCheck: true},
Expand All @@ -89,8 +89,11 @@ func Take(ctx context.Context, r arrow.Record, indices *array.Int32) (arrow.Reco
}
return res.(*compute.RecordDatum).Value, nil
}
resArr := make([]arrow.Array, r.NumCols())
if r.NumCols() == 0 {
return r, nil
}

resArr := make([]arrow.Array, r.NumCols())
defer func() {
for _, a := range resArr {
if a != nil {
Expand All @@ -102,20 +105,28 @@ func Take(ctx context.Context, r arrow.Record, indices *array.Int32) (arrow.Reco
for i := 0; i < int(r.NumCols()); i++ {
i := i
col := r.Column(i)
if d, ok := col.(*array.Dictionary); ok {
g.Go(func() error {
return TakeDictColumn(ctx, d, i, resArr, indices)
})
} else {
g.Go(func() error {
return TakeColumn(ctx, col, i, resArr, indices)
})
switch arr := r.Column(i).(type) {
case *array.Dictionary:
g.Go(func() error { return TakeDictColumn(ctx, arr, i, resArr, indices) })
case *array.List:
g.Go(func() error { return TakeListColumn(ctx, arr, i, resArr, indices) })
default:
g.Go(func() error { return TakeColumn(ctx, col, i, resArr, indices) })
}
}
err := g.Wait()
if err != nil {
if err := g.Wait(); err != nil {
return nil, err
}

// We checked for at least one column at the beginning of the function.
expectedLen := resArr[0].Len()
for _, a := range resArr {
if a.Len() != expectedLen {
return nil, fmt.Errorf(
"pqarrow/arrowutils: expected same length %d for all columns got %d for %s", expectedLen, a.Len(), a.DataType().Name(),
)
}
}
return array.NewRecord(r.Schema(), resArr, int64(indices.Len())), nil
}

Expand All @@ -129,32 +140,62 @@ func TakeColumn(ctx context.Context, a arrow.Array, idx int, arr []arrow.Array,
}

func TakeDictColumn(ctx context.Context, a *array.Dictionary, idx int, arr []arrow.Array, indices *array.Int32) error {
var add func(b *array.BinaryDictionaryBuilder, idx int) error
switch e := a.Dictionary().(type) {
r := array.NewDictionaryBuilderWithDict(
compute.GetAllocator(ctx), a.DataType().(*arrow.DictionaryType), a.Dictionary(),
).(*array.BinaryDictionaryBuilder)
defer r.Release()

r.Reserve(indices.Len())
idxBuilder := r.IndexBuilder()
for _, i := range indices.Int32Values() {
if a.IsNull(int(i)) {
r.AppendNull()
continue
}
idxBuilder.Append(a.GetValueIndex(int(i)))
}

arr[idx] = r.NewArray()
return nil
}

func TakeListColumn(ctx context.Context, a *array.List, idx int, arr []arrow.Array, indices *array.Int32) error {
r := array.NewBuilder(compute.GetAllocator(ctx), a.DataType()).(*array.ListBuilder)
valueBuilder, ok := r.ValueBuilder().(*array.BinaryDictionaryBuilder)
if !ok {
return fmt.Errorf("unexpected value builder type %T for list column", r.ValueBuilder())
}

listValues := a.ListValues().(*array.Dictionary)
switch dictV := listValues.Dictionary().(type) {
case *array.String:
add = func(b *array.BinaryDictionaryBuilder, idx int) error {
return b.AppendString(e.Value(idx))
if err := valueBuilder.InsertStringDictValues(dictV); err != nil {
return err
}
case *array.Binary:
add = func(b *array.BinaryDictionaryBuilder, idx int) error {
return b.Append(e.Value(idx))
if err := valueBuilder.InsertDictValues(dictV); err != nil {
return err
}
default:
panic(fmt.Sprintf("unexpected dictionary type %T for take", e))
}
r := array.NewBuilder(compute.GetAllocator(ctx), a.DataType()).(*array.BinaryDictionaryBuilder)
defer r.Release()
idxBuilder := valueBuilder.IndexBuilder()

r.Reserve(indices.Len())
for _, i := range indices.Int32Values() {
if a.IsNull(int(i)) {
r.AppendNull()
continue
}
err := add(r, a.GetValueIndex(int(i)))
if err != nil {
return err

r.Append(true)
start, end := a.ValueOffsets(int(i))
for j := start; j < end; j++ {
idxBuilder.Append(listValues.GetValueIndex(int(j)))
}
// Resize is necessary here for the correct offsets to be appended to
// the list builder. Otherwise length will remain at 0.
valueBuilder.Resize(idxBuilder.Len())
}

arr[idx] = r.NewArray()
return nil
}
Expand Down
171 changes: 156 additions & 15 deletions pqarrow/arrowutils/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package arrowutils

import (
"context"
"fmt"
"sort"
"testing"

Expand Down Expand Up @@ -240,7 +241,7 @@ func TestSortRecordBuilderReuse(t *testing.T) {
}

func TestReorderRecord(t *testing.T) {
t.Run("Without dictionary field", func(t *testing.T) {
t.Run("Simple", func(t *testing.T) {
mem := memory.NewGoAllocator()
b := array.NewRecordBuilder(mem, arrow.NewSchema(
[]arrow.Field{
Expand All @@ -266,7 +267,7 @@ func TestReorderRecord(t *testing.T) {
want := []int64{1, 2, 3}
require.Equal(t, want, result.Column(0).(*array.Int64).Int64Values())
})
t.Run("With dictionary field", func(t *testing.T) {
t.Run("WithDict", func(t *testing.T) {
mem := memory.NewGoAllocator()
b := array.NewRecordBuilder(mem, arrow.NewSchema(
[]arrow.Field{
Expand All @@ -281,27 +282,83 @@ func TestReorderRecord(t *testing.T) {
))
defer b.Release()
d := b.Field(0).(*array.BinaryDictionaryBuilder)
d.Reserve(3)
require.Nil(t, d.AppendString("3"))
require.Nil(t, d.AppendString("2"))
require.Nil(t, d.AppendString("1"))
require.NoError(t, d.AppendString("3"))
require.NoError(t, d.AppendString("2"))
require.NoError(t, d.AppendString("1"))
d.AppendNull()
require.NoError(t, d.AppendString("3"))
r := b.NewRecord()
defer r.Release()

indices := array.NewInt32Builder(mem)
indices.AppendValues([]int32{2, 1, 0}, nil)
by := indices.NewInt32Array()
result, err := Take(
compute.WithAllocator(context.Background(), mem), r, by)
require.Nil(t, err)
indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil)
result, err := Take(compute.WithAllocator(context.Background(), mem), r, indices.NewInt32Array())
require.NoError(t, err)
defer result.Release()

want := []string{"1", "2", "3"}
want := []string{"1", "2", "3", "3", ""}
got := result.Column(0).(*array.Dictionary)
str := got.Dictionary().(*array.String)
require.Equal(t, len(want), got.Len())
for i := range want {
require.Equal(t, want[i], str.Value(got.GetValueIndex(i)))
for i, v := range want {
if v == "" {
require.True(t, got.IsNull(i))
continue
}
require.Equal(t, want[i], got.ValueStr(i))
}
})
t.Run("List", func(t *testing.T) {
mem := memory.NewGoAllocator()
b := array.NewRecordBuilder(mem, arrow.NewSchema(
[]arrow.Field{
{
Name: "list",
Type: arrow.ListOf(&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.BinaryTypes.String}),
},
}, nil,
))
defer b.Release()
lb := b.Field(0).(*array.ListBuilder)
vb := lb.ValueBuilder().(*array.BinaryDictionaryBuilder)
lb.Append(true)
require.NoError(t, vb.AppendString("1"))
require.NoError(t, vb.AppendString("2"))
require.NoError(t, vb.AppendString("3"))
require.NoError(t, vb.AppendString("1"))
lb.Append(false)
lb.Append(true)
require.NoError(t, vb.AppendString("4"))
require.NoError(t, vb.AppendString("5"))
require.NoError(t, vb.AppendString("6"))
lb.Append(true)
require.NoError(t, vb.AppendString("3"))
require.NoError(t, vb.AppendString("3"))
require.NoError(t, vb.AppendString("3"))
require.NoError(t, vb.AppendString("4"))
r := b.NewRecord()
defer r.Release()

indices := array.NewInt32Builder(mem)
indices.AppendValues([]int32{2, 1, 0, 3}, nil)
result, err := Take(
compute.WithAllocator(context.Background(), mem), r, indices.NewInt32Array())
require.Nil(t, err)
defer result.Release()

got := result.Column(0).(*array.List)
expected := []string{
"[\"4\",\"5\",\"6\"]",
"",
"[\"1\",\"2\",\"3\",\"1\"]",
"[\"3\",\"3\",\"3\",\"4\"]",
}
require.Equal(t, len(expected), got.Len())
for i, v := range expected {
if len(v) == 0 {
require.True(t, got.IsNull(i), "expected null at %d", i)
continue
}
require.Equal(t, expected[i], got.ValueStr(i), "unexpected value at %d", i)
}
})
}
Expand Down Expand Up @@ -387,3 +444,87 @@ func sortAndCompare(t *testing.T, kase SortCase) {

require.Equal(t, kase.Indices, got.Int32Values())
}

func BenchmarkTake(b *testing.B) {
const (
numRows = 1024
numValsPerListElem = 4
)
mem := memory.NewGoAllocator()
b.Run("Dict", func(b *testing.B) {
rb := array.NewRecordBuilder(mem, arrow.NewSchema(
[]arrow.Field{
{
Name: "dict",
Type: &arrow.DictionaryType{
IndexType: arrow.PrimitiveTypes.Int32,
ValueType: arrow.BinaryTypes.Binary,
},
},
}, nil,
))
defer rb.Release()
d := rb.Field(0).(*array.BinaryDictionaryBuilder)
for i := 0; i < numRows; i++ {
// Interesting to benchmark with a string that appears every other row.
// i.e. only one entry in the dict.
require.NoError(b, d.AppendString("appearseveryotherrow"))
require.NoError(b, d.AppendString(fmt.Sprintf("%d", i)))
}
r := rb.NewRecord()
indices := array.NewInt32Builder(mem)
for i := r.NumRows() - 1; i > 0; i-- {
indices.Append(int32(i))
}
ctx := compute.WithAllocator(context.Background(), mem)
indArr := indices.NewInt32Array()

b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := Take(ctx, r, indArr); err != nil {
b.Fatal(err)
}
}
})

b.Run("List", func(b *testing.B) {
listb := array.NewRecordBuilder(mem, arrow.NewSchema(
[]arrow.Field{
{
Name: "list",
Type: arrow.ListOf(
&arrow.DictionaryType{
IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.BinaryTypes.Binary,
},
),
},
}, nil,
))
defer listb.Release()

l := listb.Field(0).(*array.ListBuilder)
vb := l.ValueBuilder().(*array.BinaryDictionaryBuilder)
for i := 0; i < numRows; i++ {
l.Append(true)
for j := 0; j < numValsPerListElem-1; j++ {
require.NoError(b, vb.AppendString(fmt.Sprintf("%d", i)))
}
require.NoError(b, vb.AppendString("appearseveryrow"))
}

r := listb.NewRecord()
indices := array.NewInt32Builder(mem)
for i := numRows - 1; i > 0; i-- {
indices.Append(int32(i))
}
ctx := compute.WithAllocator(context.Background(), mem)
indArr := indices.NewInt32Array()

b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := Take(ctx, r, indArr); err != nil {
b.Fatal(err)
}
}
})
}

0 comments on commit 316067a

Please sign in to comment.