diff --git a/posting/index.go b/posting/index.go index 3257583899f..a2e6cddbb0c 100644 --- a/posting/index.go +++ b/posting/index.go @@ -25,8 +25,10 @@ import ( "fmt" "math" "os" + "strings" "sync/atomic" "time" + "unsafe" "github.com/golang/glog" "github.com/pkg/errors" @@ -653,7 +655,7 @@ func (r *rebuilder) RunWithoutTemp(ctx context.Context) error { stream := pstore.NewStreamAt(r.startTs) stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr) stream.Prefix = r.prefix - stream.NumGo = 128 + stream.NumGo = 16 txn := NewTxn(r.startTs) stream.KeyToList = func(key []byte, it *badger.Iterator) (*bpb.KVList, error) { // We should return quickly if the context is no longer valid. @@ -742,6 +744,10 @@ func (r *rebuilder) RunWithoutTemp(ctx context.Context) error { return err } + if os.Getenv("DEBUG_SHOW_HNSW_TREE") != "" { + printTreeStats(txn) + } + txn.Update() writer := NewTxnWriter(pstore) @@ -763,6 +769,84 @@ func (r *rebuilder) RunWithoutTemp(ctx context.Context) error { }) } +func printTreeStats(txn *Txn) { + txn.cache.Lock() + + numLevels := 20 + numNodes := make([]int, numLevels) + numConnections := make([]int, numLevels) + + var temp [][]uint64 + for key, pl := range txn.cache.plists { + pk, _ := x.Parse([]byte(key)) + if strings.HasSuffix(pk.Attr, "__vector_") { + data := pl.getPosting(txn.cache.startTs) + if data == nil || len(data.Postings) == 0 { + continue + } + + err := decodeUint64MatrixUnsafe(data.Postings[0].Value, &temp) + if err != nil { + fmt.Println("Error while decoding", err) + } + + for i := 0; i < len(temp); i++ { + if len(temp[i]) > 0 { + numNodes[i] += 1 + } + numConnections[i] += len(temp[i]) + } + + } + } + + for i := 0; i < numLevels; i++ { + fmt.Printf("%d, ", numNodes[i]) + } + fmt.Println("") + for i := 0; i < numLevels; i++ { + fmt.Printf("%d, ", numConnections[i]) + } + fmt.Println("") + for i := 0; i < numLevels; i++ { + if numNodes[i] == 0 { + fmt.Printf("0, ") + continue + } + fmt.Printf("%d, ", numConnections[i]/numNodes[i]) + } + fmt.Println("") + + txn.cache.Unlock() +} + +func decodeUint64MatrixUnsafe(data []byte, matrix *[][]uint64) error { + if len(data) == 0 { + return nil + } + + offset := 0 + // Read number of rows + rows := *(*uint64)(unsafe.Pointer(&data[offset])) + offset += 8 + + *matrix = make([][]uint64, rows) + + for i := 0; i < int(rows); i++ { + // Read row length + rowLen := *(*uint64)(unsafe.Pointer(&data[offset])) + offset += 8 + + (*matrix)[i] = make([]uint64, rowLen) + for j := 0; j < int(rowLen); j++ { + (*matrix)[i][j] = *(*uint64)(unsafe.Pointer(&data[offset])) + offset += 8 + } + } + + return nil +} + func (r *rebuilder) Run(ctx context.Context) error { if r.startTs == 0 { glog.Infof("maxassigned is 0, no indexing work for predicate %s", r.attr) diff --git a/posting/list.go b/posting/list.go index fe5acde126f..020bb17c362 100644 --- a/posting/list.go +++ b/posting/list.go @@ -545,6 +545,16 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed return nil } +// getMutation returns a marshaled version of posting list mutation stored internally. +func (l *List) getPosting(startTs uint64) *pb.PostingList { + l.RLock() + defer l.RUnlock() + if pl, ok := l.mutationMap[startTs]; ok { + return pl + } + return nil +} + // getMutation returns a marshaled version of posting list mutation stored internally. func (l *List) getMutation(startTs uint64) []byte { l.RLock() diff --git a/posting/lists.go b/posting/lists.go index 8ca5ea427f9..c8134b77440 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -143,9 +143,17 @@ func (vc *viLocalCache) GetWithLockHeld(key []byte) (rval index.Value, rerr erro } func (vc *viLocalCache) GetValueFromPostingList(pl *List) (rval index.Value, rerr error) { - val, err := pl.ValueWithLockHeld(vc.delegate.startTs) - rval = val.Value - return rval, err + value := pl.findStaticValue(vc.delegate.startTs) + + if value == nil { + return nil, ErrNoValue + } + + if hasDeleteAll(value.Postings[0]) || value.Postings[0].Op == Del { + return nil, ErrNoValue + } + + return value.Postings[0].Value, nil } func NewViLocalCache(delegate *LocalCache) *viLocalCache { @@ -280,12 +288,16 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List { } func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) { + skey := string(key) getNewPlistNil := func() (*List, error) { lc.RLock() defer lc.RUnlock() if lc.plists == nil { return getNew(key, pstore, lc.startTs) } + if l, ok := lc.plists[skey]; ok { + return l, nil + } return nil, nil } @@ -293,11 +305,6 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) return l, err } - skey := string(key) - if pl := lc.getNoStore(skey); pl != nil { - return pl, nil - } - var pl *List if readFromDisk { var err error diff --git a/posting/oracle.go b/posting/oracle.go index 57cf6f317b9..81ab8b80325 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -103,9 +103,18 @@ func (vt *viTxn) GetWithLockHeld(key []byte) (rval index.Value, rerr error) { } func (vt *viTxn) GetValueFromPostingList(pl *List) (rval index.Value, rerr error) { - val, err := pl.ValueWithLockHeld(vt.delegate.StartTs) - rval = val.Value - return rval, err + value := pl.findStaticValue(vt.delegate.StartTs) + + if value == nil { + //fmt.Println("DIFF", val, err, nil, badger.ErrKeyNotFound) + return nil, ErrNoValue + } + + if hasDeleteAll(value.Postings[0]) || value.Postings[0].Op == Del { + return nil, ErrNoValue + } + + return value.Postings[0].Value, nil } func (vt *viTxn) AddMutation(ctx context.Context, key []byte, t *index.KeyValue) error { diff --git a/query/math.go b/query/math.go index ae4f06c0902..f4e45b2d608 100644 --- a/query/math.go +++ b/query/math.go @@ -84,6 +84,25 @@ func processBinary(mNode *mathTree) error { return nil } + // If mpl or mpr have 0 and just 0 in it, that means it's an output of aggregation somewhere. + // This value would need to be applied to all. + checkAggrResult := func(value map[uint64]types.Val) (types.Val, bool) { + if len(value) != 1 { + return types.Val{}, false + } + + val, ok := value[0] + return val, ok + } + + if val, ok := checkAggrResult(mpl); ok { + cl = val + mpl = nil + } else if val, ok := checkAggrResult(mpr); ok { + cr = val + mpr = nil + } + if len(mpl) != 0 || len(mpr) != 0 { for k := range mpr { if err := f(k); err != nil { diff --git a/query/math_test.go b/query/math_test.go index ae57c9792ef..5b41569cc7d 100644 --- a/query/math_test.go +++ b/query/math_test.go @@ -26,6 +26,53 @@ import ( "github.com/dgraph-io/dgraph/types" ) +func TestVector(t *testing.T) { + tree := &mathTree{ + Fn: "sqrt", + Child: []*mathTree{{ + Fn: "dot", + Child: []*mathTree{ + { + Fn: "-", + Child: []*mathTree{ + { + Var: "v1", + Val: map[uint64]types.Val{ + 0: {Tid: 12, Value: []float32{1.0, 2.0}}, + }, + }, + { + Var: "v2", + Val: map[uint64]types.Val{ + 123: {Tid: 12, Value: []float32{1.0, 2.0}}, + }, + }, + }, + }, + { + Fn: "-", + Child: []*mathTree{ + { + Var: "v1", + Val: map[uint64]types.Val{ + 0: {Tid: 12, Value: []float32{1.0, 2.0}}, + }, + }, + { + Var: "v2", + Val: map[uint64]types.Val{ + 123: {Tid: 12, Value: []float32{1.0, 2.0}}, + }, + }, + }, + }, + }, + }}, + } + + require.NoError(t, evalMathTree(tree)) +} + func TestProcessBinary(t *testing.T) { tests := []struct { in *mathTree diff --git a/query/query.go b/query/query.go index fcb61f48959..7db07ba4c07 100644 --- a/query/query.go +++ b/query/query.go @@ -1185,6 +1185,7 @@ func (sg *SubGraph) transformVars(doneVars map[string]varValue, path []*SubGraph mt.Const = val continue } + mt.Val = newMap } return nil diff --git a/query/vector/vector_test.go b/query/vector/vector_test.go index eb5568d42c6..c19a8bfc4b1 100644 --- a/query/vector/vector_test.go +++ b/query/vector/vector_test.go @@ -598,10 +598,10 @@ func TestVectorDelete(t *testing.T) { deleteTriplesInCluster(triple) uid := strings.Split(triple, " ")[0] query = fmt.Sprintf(`{ - vector(func: uid(%s)) { - vtest - } - }`, uid[1:len(uid)-1]) + vector(func: uid(%s)) { + vtest + } + }`, uid[1:len(uid)-1]) result = processQueryNoErr(t, query) require.JSONEq(t, `{"data": {"vector":[]}}`, result) diff --git a/tok/hnsw/heap.go b/tok/hnsw/heap.go index da165f835ae..fee786dd607 100644 --- a/tok/hnsw/heap.go +++ b/tok/hnsw/heap.go @@ -1,3 +1,21 @@ +/* + * Copyright 2016-2024 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Co-authored by: jairad26@gmail.com, sunil@hypermode.com, bill@hypdermode.com + */ + package hnsw import ( diff --git a/tok/hnsw/helper.go b/tok/hnsw/helper.go index b58a911f072..6111e05f355 100644 --- a/tok/hnsw/helper.go +++ b/tok/hnsw/helper.go @@ -1,6 +1,25 @@ +/* + * Copyright 2016-2024 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Co-authored by: jairad26@gmail.com, sunil@hypermode.com, bill@hypdermode.com + */ + package hnsw import ( + "container/heap" "context" "encoding/binary" "fmt" @@ -518,6 +537,56 @@ func (ph *persistentHNSW[T]) newPersistentEdgeKeyValueEntry(ctx context.Context, return edge, nil } +func (ph *persistentHNSW[T]) distance_betw(ctx context.Context, tc *TxnCache, inUuid, outUuid uint64, inVec, + outVec *[]T) T { + err := ph.getVecFromUid(outUuid, tc, outVec) + if err != nil { + log.Printf("[ERROR] While getting vector %s", err) + return -1 + } + + d, err := ph.simType.distanceScore(*inVec, *outVec, ph.floatBits) + if err != nil { + log.Printf("[ERROR] While getting vector %s", err) + return -1 + } + return d +} + +type HeapDataHolder struct { + data []uint64 + compare func(a, b uint64) bool +} + +// Len is the number of elements in the collection. +func (h HeapDataHolder) Len() int { + return len(h.data) +} + +// Less reports whether the element with index i should sort before the element with index j. +func (h HeapDataHolder) Less(i, j int) bool { + return h.compare(h.data[i], h.data[j]) +} + +// Swap swaps the elements with indexes i and j. +func (h HeapDataHolder) Swap(i, j int) { + h.data[i], h.data[j] = h.data[j], h.data[i] +} + +// Push adds an element to the heap. +func (h *HeapDataHolder) Push(x interface{}) { + h.data = append(h.data, x.(uint64)) +} + +// Pop removes and returns the maximum element from the heap. +func (h *HeapDataHolder) Pop() interface{} { + old := h.data + n := len(old) + x := old[n-1] + h.data = old[0 : n-1] + return x +} + // addNeighbors adds the neighbors of the given uuid to the given level. // It returns the edge created and the error if any. func (ph *persistentHNSW[T]) addNeighbors(ctx context.Context, tc *TxnCache, @@ -544,6 +613,7 @@ func (ph *persistentHNSW[T]) addNeighbors(ctx context.Context, tc *TxnCache, } } } + var inVec, outVec []T for level := 0; level < ph.maxLevels; level++ { allLayerEdges[level], nnEdgesErr = ph.removeDeadNodes(allLayerEdges[level], tc) if nnEdgesErr != nil { @@ -551,6 +621,25 @@ func (ph *persistentHNSW[T]) addNeighbors(ctx context.Context, tc *TxnCache, } // This adds at most efConstruction number of edges for each layer for this node allLayerEdges[level] = append(allLayerEdges[level], allLayerNeighbors[level]...) + if len(allLayerEdges[level]) > ph.efConstruction { + err := ph.getVecFromUid(uuid, tc, &inVec) + if err != nil { + log.Printf("[ERROR] While getting vector %s", err) + } else { + h := &HeapDataHolder{ + data: allLayerEdges[level], + compare: func(i, j uint64) bool { + return ph.distance_betw(ctx, tc, uuid, i, &inVec, &outVec) > + ph.distance_betw(ctx, tc, uuid, j, &inVec, &outVec) + }} + + for _, e := range allLayerNeighbors[level] { + heap.Push(h, e) + heap.Pop(h) + } + } + allLayerEdges[level] = allLayerEdges[level][:ph.efConstruction] + } } // on every modification of the layer edges, add it to in mem map so you dont have to always be reading diff --git a/tok/hnsw/persistent_factory.go b/tok/hnsw/persistent_factory.go index d391744195e..2b353e1a25b 100644 --- a/tok/hnsw/persistent_factory.go +++ b/tok/hnsw/persistent_factory.go @@ -1,3 +1,21 @@ +/* + * Copyright 2016-2024 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Co-authored by: jairad26@gmail.com, sunil@hypermode.com, bill@hypdermode.com + */ + package hnsw import ( diff --git a/tok/hnsw/persistent_hnsw.go b/tok/hnsw/persistent_hnsw.go index 00b55552e04..f6a33585bd4 100644 --- a/tok/hnsw/persistent_hnsw.go +++ b/tok/hnsw/persistent_hnsw.go @@ -1,3 +1,21 @@ +/* + * Copyright 2016-2024 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Co-authored by: jairad26@gmail.com, sunil@hypermode.com, bill@hypdermode.com + */ + package hnsw import ( diff --git a/tok/hnsw/persistent_hnsw_test.go b/tok/hnsw/persistent_hnsw_test.go index 2befa742030..11adcbea90e 100644 --- a/tok/hnsw/persistent_hnsw_test.go +++ b/tok/hnsw/persistent_hnsw_test.go @@ -1,3 +1,21 @@ +/* + * Copyright 2016-2024 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Co-authored by: jairad26@gmail.com, sunil@hypermode.com, bill@hypdermode.com + */ + package hnsw import ( diff --git a/tok/hnsw/search_layer.go b/tok/hnsw/search_layer.go index d01a864d063..2e39472dcc6 100644 --- a/tok/hnsw/search_layer.go +++ b/tok/hnsw/search_layer.go @@ -1,3 +1,21 @@ +/* + * Copyright 2016-2024 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Co-authored by: jairad26@gmail.com, sunil@hypermode.com, bill@hypdermode.com + */ + package hnsw import ( diff --git a/tok/hnsw/test_helper.go b/tok/hnsw/test_helper.go index 29f1412254a..69f851e659e 100644 --- a/tok/hnsw/test_helper.go +++ b/tok/hnsw/test_helper.go @@ -1,3 +1,21 @@ +/* + * Copyright 2016-2024 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Co-authored by: jairad26@gmail.com, sunil@hypermode.com, bill@hypdermode.com + */ + package hnsw import ( diff --git a/tok/index/helper.go b/tok/index/helper.go index de56ac5023d..2d08897c18b 100644 --- a/tok/index/helper.go +++ b/tok/index/helper.go @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 Dgraph Labs, Inc. and Contributors + * Copyright 2016-2024 Dgraph Labs, Inc. and Contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,6 +12,8 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Co-authored by: jairad26@gmail.com, sunil@hypermode.com, bill@hypdermode.com */ package index diff --git a/tok/index/helper_test.go b/tok/index/helper_test.go index 1968ae8d4d3..0dc0ab1a7b3 100644 --- a/tok/index/helper_test.go +++ b/tok/index/helper_test.go @@ -12,6 +12,8 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Co-authored by: jairad26@gmail.com, sunil@hypermode.com, bill@hypdermode.com */ package index diff --git a/tok/index/index.go b/tok/index/index.go index 9d51b37ad55..e5c46894ba2 100644 --- a/tok/index/index.go +++ b/tok/index/index.go @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 Dgraph Labs, Inc. and Contributors + * Copyright 2016-2024 Dgraph Labs, Inc. and Contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,6 +12,8 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Co-authored by: jai@hypermode.com, sunil@hypermode.com, bill@hypdermode.com */ package index diff --git a/tok/index/search_path.go b/tok/index/search_path.go index b95f2e5a8e7..761971d9d98 100644 --- a/tok/index/search_path.go +++ b/tok/index/search_path.go @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 Dgraph Labs, Inc. and Contributors + * Copyright 2016-2024 Dgraph Labs, Inc. and Contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,6 +12,8 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Co-authored by: jairad26@gmail.com, sunil@hypermode.com, bill@hypdermode.com */ package index diff --git a/tok/index/types.go b/tok/index/types.go index a9bac545b43..254b292ed8d 100644 --- a/tok/index/types.go +++ b/tok/index/types.go @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 Dgraph Labs, Inc. and Contributors + * Copyright 2016-2024 Dgraph Labs, Inc. and Contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,6 +12,8 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Co-authored by: jairad26@gmail.com, sunil@hypermode.com, bill@hypdermode.com */ package index