Skip to content

Commit

Permalink
[kvexec] merge join (#8561)
Browse files Browse the repository at this point in the history
* [kvexec] merge join

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* fix null panic

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* left joins

* join tests passing

* skip virtual cols

* more left join bugs

* better documentation

* more bugs

* more bugs

* simplify arg passing

* comments

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* more comments

---------

Co-authored-by: max-hoffman <[email protected]>
  • Loading branch information
max-hoffman and max-hoffman authored Nov 22, 2024
1 parent bb53838 commit d5534d1
Show file tree
Hide file tree
Showing 5 changed files with 829 additions and 73 deletions.
63 changes: 50 additions & 13 deletions go/libraries/doltcore/sqle/index/index_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,25 @@ func (rp rangePartition) Key() []byte {
return rp.key
}

func GetDurableIndex(ctx *sql.Context,
tab DoltTableable,
idx DoltIndex) (durable.Index, error) {
di := idx.(*doltIndex)
s, err := di.getDurableState(ctx, tab)
if err != nil {
return nil, err
}
return s.Secondary, nil
}

// IndexScanBuilder generates secondary lookups for partitions and
// encapsulates fast path optimizations for certain point lookups.
type IndexScanBuilder interface {
IndexRangeIterable

// NewPartitionRowIter returns a sql.RowIter for an index partition.
NewPartitionRowIter(ctx *sql.Context, part sql.Partition) (sql.RowIter, error)

// NewRangeMapIter returns a prolly.MapIter for an index partition.
NewRangeMapIter(ctx context.Context, r prolly.Range, reverse bool) (prolly.MapIter, error)

// NewSecondaryIter returns an object used to perform secondary lookups
// for index joins.
NewSecondaryIter(strict bool, cnt int, nullSafe []bool) SecondaryLookupIterGen
Expand All @@ -255,6 +265,11 @@ type IndexScanBuilder interface {
OutputSchema() schema.Schema
}

type IndexRangeIterable interface {
// NewRangeMapIter returns a prolly.MapIter for an index partition.
NewRangeMapIter(ctx context.Context, r prolly.Range, reverse bool) (prolly.MapIter, error)
}

func NewIndexReaderBuilder(
ctx *sql.Context,
tab DoltTableable,
Expand Down Expand Up @@ -339,12 +354,11 @@ func newNonCoveringLookupBuilder(s *durableIndexState, b *baseIndexImplBuilder)
primary := durable.ProllyMapFromIndex(s.Primary)
priKd, _ := primary.Descriptors()
tbBld := val.NewTupleBuilder(priKd)
pkMap := ordinalMappingFromIndex(b.idx)
pkMap := OrdinalMappingFromIndex(b.idx)
keyProj, valProj, ordProj := projectionMappings(b.idx.Schema(), b.projections)
return &nonCoveringIndexImplBuilder{
baseIndexImplBuilder: b,
pri: primary,
priKd: priKd,
pkBld: tbBld,
pkMap: pkMap,
keyMap: keyProj,
Expand Down Expand Up @@ -439,18 +453,18 @@ type coveringIndexImplBuilder struct {
keyMap, valMap, ordMap val.OrdinalMapping
}

func NewSequenceRangeIter(ctx context.Context, ib IndexScanBuilder, ranges []prolly.Range, reverse bool) (prolly.MapIter, error) {
func NewSequenceRangeIter(ctx context.Context, irIter IndexRangeIterable, ranges []prolly.Range, reverse bool) (prolly.MapIter, error) {
if len(ranges) == 0 {
return &strictLookupIter{}, nil
}
// TODO: probably need to do something with Doltgres ranges here?
cur, err := ib.NewRangeMapIter(ctx, ranges[0], reverse)
cur, err := irIter.NewRangeMapIter(ctx, ranges[0], reverse)
if err != nil || len(ranges) < 2 {
return cur, err
}
return &sequenceRangeIter{
cur: cur,
ib: ib,
irIter: irIter,
reverse: reverse,
remainingRanges: ranges[1:],
}, nil
Expand All @@ -461,7 +475,7 @@ func NewSequenceRangeIter(ctx context.Context, ib IndexScanBuilder, ranges []pro
// an underlying map.
type sequenceRangeIter struct {
cur prolly.MapIter
ib IndexScanBuilder
irIter IndexRangeIterable
reverse bool
remainingRanges []prolly.Range
}
Expand All @@ -475,7 +489,7 @@ func (i *sequenceRangeIter) Next(ctx context.Context) (val.Tuple, val.Tuple, err
if len(i.remainingRanges) == 0 {
return nil, nil, io.EOF
}
i.cur, err = i.ib.NewRangeMapIter(ctx, i.remainingRanges[0], i.reverse)
i.cur, err = i.irIter.NewRangeMapIter(ctx, i.remainingRanges[0], i.reverse)
if err != nil {
return nil, nil, err
}
Expand All @@ -487,6 +501,23 @@ func (i *sequenceRangeIter) Next(ctx context.Context) (val.Tuple, val.Tuple, err
return k, v, nil
}

type secIterGen struct {
m prolly.Map
}

func NewSecondaryIterGen(m prolly.Map) IndexRangeIterable {
return secIterGen{m: m}
}

// NewRangeMapIter implements IndexScanBuilder
func (si secIterGen) NewRangeMapIter(ctx context.Context, r prolly.Range, reverse bool) (prolly.MapIter, error) {
if reverse {
return si.m.IterRangeReverse(ctx, r)
} else {
return si.m.IterRange(ctx, r)
}
}

func (ib *coveringIndexImplBuilder) OutputSchema() schema.Schema {
return ib.idx.IndexSchema()
}
Expand Down Expand Up @@ -536,7 +567,6 @@ type nonCoveringIndexImplBuilder struct {
*baseIndexImplBuilder

pri prolly.Map
priKd val.TupleDesc
pkBld *val.TupleBuilder

pkMap, keyMap, valMap, ordMap val.OrdinalMapping
Expand Down Expand Up @@ -628,6 +658,13 @@ func (ib *nonCoveringIndexImplBuilder) NewSecondaryIter(strict bool, cnt int, nu
}
}

func NewKeylessIndexImplBuilder(pri, sec durable.Index, idx DoltIndex) *keylessIndexImplBuilder {
return &keylessIndexImplBuilder{
baseIndexImplBuilder: &baseIndexImplBuilder{idx: idx.(*doltIndex)},
s: &durableIndexState{Primary: pri, Secondary: sec},
}
}

// TODO keylessIndexImplBuilder should be similar to the non-covering
// index case, where we will need to reference the primary index,
// but can take advantage of point lookup optimizations
Expand All @@ -651,7 +688,7 @@ func (ib *keylessIndexImplBuilder) NewRangeMapIter(ctx context.Context, r prolly
}
clustered := durable.ProllyMapFromIndex(rows)
keyDesc := clustered.KeyDesc()
indexMap := ordinalMappingFromIndex(ib.idx)
indexMap := OrdinalMappingFromIndex(ib.idx)

keyBld := val.NewTupleBuilder(keyDesc)

Expand Down Expand Up @@ -721,7 +758,7 @@ func (ib *keylessIndexImplBuilder) NewSecondaryIter(strict bool, cnt int, nullSa
pri: pri,
sec: secondary,
sch: ib.idx.tableSch,
pkMap: ordinalMappingFromIndex(ib.idx),
pkMap: OrdinalMappingFromIndex(ib.idx),
pkBld: pkBld,
prefixDesc: secondary.KeyDesc().PrefixDesc(cnt),
}
Expand Down
6 changes: 3 additions & 3 deletions go/libraries/doltcore/sqle/index/prolly_index_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newProllyIndexIter(
primary := durable.ProllyMapFromIndex(dprimary)
kd, _ := primary.Descriptors()
pkBld := val.NewTupleBuilder(kd)
pkMap := ordinalMappingFromIndex(idx)
pkMap := OrdinalMappingFromIndex(idx)
keyProj, valProj, ordProj := projectionMappings(idx.Schema(), projections)

iter := prollyIndexIter{
Expand Down Expand Up @@ -135,7 +135,7 @@ func (p prollyIndexIter) Close(*sql.Context) error {
return nil
}

func ordinalMappingFromIndex(idx DoltIndex) (m val.OrdinalMapping) {
func OrdinalMappingFromIndex(idx DoltIndex) (m val.OrdinalMapping) {
def := idx.Schema().Indexes().GetByName(idx.ID())
pks := def.PrimaryKeyTags()
if len(pks) == 0 { // keyless index
Expand Down Expand Up @@ -314,7 +314,7 @@ func newProllyKeylessIndexIter(ctx *sql.Context, idx DoltIndex, rng prolly.Range

clustered := durable.ProllyMapFromIndex(rows)
keyDesc, valDesc := clustered.Descriptors()
indexMap := ordinalMappingFromIndex(idx)
indexMap := OrdinalMappingFromIndex(idx)
keyBld := val.NewTupleBuilder(keyDesc)
sch := idx.Schema()
_, vm, om := projectionMappings(sch, projections)
Expand Down
Loading

0 comments on commit d5534d1

Please sign in to comment.