diff --git a/sql/index/pilosa/driver.go b/sql/index/pilosa/driver.go index 9e91a11a4..6303c073b 100644 --- a/sql/index/pilosa/driver.go +++ b/sql/index/pilosa/driver.go @@ -498,15 +498,6 @@ func (b *bitBatch) Add(row, col uint64) { b.cols = append(b.cols, col) } -func (b *bitBatch) NextRecord() (uint64, uint64, error) { - if b.pos >= uint64(len(b.rows)) { - return 0, 0, io.EOF - } - - b.pos++ - return b.rows[b.pos-1], b.cols[b.pos-1], nil -} - func indexName(db, table string) string { h := sha1.New() io.WriteString(h, db) diff --git a/sql/index/pilosa/driver_test.go b/sql/index/pilosa/driver_test.go index 80a716fa7..86954d01b 100644 --- a/sql/index/pilosa/driver_test.go +++ b/sql/index/pilosa/driver_test.go @@ -1041,6 +1041,88 @@ func TestNegateIndex(t *testing.T) { require.Equal(expected, values) } +func TestEqualAndLessIndex(t *testing.T) { + require := require.New(t) + setup(t) + defer cleanup(t) + + ctx := sql.NewContext(context.Background()) + db, table := "db_name", "table_name" + d := NewDriver(tmpDir) + + idxEqA, err := d.Create(db, table, "idx_eq_a", makeExpressions(table, "a"), nil) + require.NoError(err) + pilosaIdxEqA, ok := idxEqA.(*pilosaIndex) + require.True(ok) + itA := &fixturePartitionKeyValueIter{ + fixtures: []partitionKeyValueFixture{ + { + testPartition(0), + []kvfixture{ + {"1", []interface{}{int64(2)}}, + {"2", []interface{}{int64(7)}}, + {"3", []interface{}{int64(1)}}, + {"4", []interface{}{int64(1)}}, + {"5", []interface{}{int64(1)}}, + {"6", []interface{}{int64(10)}}, + {"7", []interface{}{int64(5)}}, + {"8", []interface{}{int64(6)}}, + {"9", []interface{}{int64(4)}}, + {"10", []interface{}{int64(1)}}, + }, + }, + }, + } + err = d.Save(ctx, pilosaIdxEqA, itA) + require.NoError(err) + eqALookup, err := pilosaIdxEqA.Get(int64(1)) + require.NoError(err) + + values, err := lookupValues(eqALookup) + require.NoError(err) + expected := []string{"3", "4", "5", "10"} + require.Equal(expected, values) + + idxLessB, err := d.Create(db, table, "idx_less_b", makeExpressions(table, "b"), nil) + require.NoError(err) + pilosaIdxLessB, ok := idxLessB.(*pilosaIndex) + require.True(ok) + itB := &fixturePartitionKeyValueIter{ + fixtures: []partitionKeyValueFixture{ + { + testPartition(0), + []kvfixture{ + {"1", []interface{}{int64(1)}}, + {"2", []interface{}{int64(2)}}, + {"3", []interface{}{int64(3)}}, + {"4", []interface{}{int64(4)}}, + {"5", []interface{}{int64(5)}}, + {"6", []interface{}{int64(6)}}, + {"7", []interface{}{int64(7)}}, + {"8", []interface{}{int64(8)}}, + {"9", []interface{}{int64(9)}}, + {"10", []interface{}{int64(10)}}, + }, + }, + }, + } + err = d.Save(ctx, pilosaIdxLessB, itB) + require.NoError(err) + lessB, err := pilosaIdxLessB.AscendLessThan(int64(5)) + require.NoError(err) + lessBLookup := lessB.(*ascendLookup) + + values, err = lookupValues(lessBLookup) + require.NoError(err) + expected = []string{"1", "2", "3", "4"} + require.Equal(expected, values) + + interLookup := eqALookup.(sql.SetOperations).Intersection(lessBLookup) + values, err = lookupValues(interLookup) + require.NoError(err) + expected = []string{"3", "4"} + require.Equal(expected, values) +} func TestPilosaHolder(t *testing.T) { require := require.New(t) setup(t) diff --git a/sql/index/pilosa/lookup.go b/sql/index/pilosa/lookup.go index 1e0e8e82f..50bfa91f7 100644 --- a/sql/index/pilosa/lookup.go +++ b/sql/index/pilosa/lookup.go @@ -80,15 +80,7 @@ func (l *indexLookup) indexName() string { return l.index.Name() } -func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) { - if err := l.mapping.open(); err != nil { - return nil, err - } - defer l.mapping.close() - - if err := l.index.Open(); err != nil { - return nil, err - } +func (l *indexLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, error) { var row *pilosa.Row for i, expr := range l.expressions { field := l.index.Field(fieldName(l.id, expr, p)) @@ -107,7 +99,25 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) { row = intersect(row, r) } - if err := l.index.Close(); err != nil { + return row, nil +} + +func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) { + if err := l.mapping.open(); err != nil { + return nil, err + } + defer l.mapping.close() + + if err := l.index.Open(); err != nil { + return nil, err + } + row, err := l.intersectExpressions(p) + if e := l.index.Close(); e != nil { + if err == nil { + err = e + } + } + if err != nil { return nil, err } @@ -213,14 +223,10 @@ func (l *filteredLookup) indexName() string { return l.index.Name() } -func (l *filteredLookup) values(p sql.Partition) (*pilosa.Row, error) { - if err := l.mapping.open(); err != nil { - return nil, err - } - defer l.mapping.close() - - // evaluate Intersection of bitmaps +// evaluate Intersection of bitmaps +func (l *filteredLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, error) { var row *pilosa.Row + for i, expr := range l.expressions { field := l.index.Field(fieldName(l.id, expr, p)) rows, err := l.mapping.filter(field.Name(), func(b []byte) (bool, error) { @@ -242,6 +248,28 @@ func (l *filteredLookup) values(p sql.Partition) (*pilosa.Row, error) { row = intersect(row, r) } + return row, nil +} + +func (l *filteredLookup) values(p sql.Partition) (*pilosa.Row, error) { + if err := l.mapping.open(); err != nil { + return nil, err + } + defer l.mapping.close() + + if err := l.index.Open(); err != nil { + return nil, err + } + row, err := l.intersectExpressions(p) + if e := l.index.Close(); e != nil { + if err == nil { + err = e + } + } + if err != nil { + return nil, err + } + // evaluate composition of operations for _, op := range l.operations { var ( @@ -269,9 +297,6 @@ func (l *filteredLookup) values(p sql.Partition) (*pilosa.Row, error) { } func (l *filteredLookup) Values(p sql.Partition) (sql.IndexValueIter, error) { - l.index.Open() - defer l.index.Close() - row, err := l.values(p) if err != nil { return nil, err @@ -359,12 +384,7 @@ type negateLookup struct { func (l *negateLookup) indexName() string { return l.index.Name() } -func (l *negateLookup) values(p sql.Partition) (*pilosa.Row, error) { - if err := l.mapping.open(); err != nil { - return nil, err - } - defer l.mapping.close() - +func (l *negateLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, error) { var row *pilosa.Row for i, expr := range l.expressions { field := l.index.Field(fieldName(l.id, expr, p)) @@ -401,6 +421,27 @@ func (l *negateLookup) values(p sql.Partition) (*pilosa.Row, error) { row = intersect(row, r) } + return row, nil +} + +func (l *negateLookup) values(p sql.Partition) (*pilosa.Row, error) { + if err := l.mapping.open(); err != nil { + return nil, err + } + defer l.mapping.close() + + if err := l.index.Open(); err != nil { + return nil, err + } + row, err := l.intersectExpressions(p) + if e := l.index.Close(); e != nil { + if err == nil { + err = e + } + } + if err != nil { + return nil, err + } // evaluate composition of operations for _, op := range l.operations { @@ -431,9 +472,6 @@ func (l *negateLookup) values(p sql.Partition) (*pilosa.Row, error) { // Values implements sql.IndexLookup.Values func (l *negateLookup) Values(p sql.Partition) (sql.IndexValueIter, error) { - l.index.Open() - defer l.index.Close() - row, err := l.values(p) if err != nil { return nil, err