diff --git a/posting/index.go b/posting/index.go index bb1f7191c3a..9be79d9a7bf 100644 --- a/posting/index.go +++ b/posting/index.go @@ -110,82 +110,85 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo) return []*pb.DirectedEdge{}, errors.New("invalid UID with value 0") } - inKey := x.DataKey(info.edge.Attr, uid) - pl, err := txn.Get(inKey) - if err != nil { - return []*pb.DirectedEdge{}, err - } - data, err := pl.AllValues(txn.StartTs) - if err != nil { - return []*pb.DirectedEdge{}, err - } - - if info.op == pb.DirectedEdge_DEL && - len(data) > 0 && data[0].Tid == types.VFloatID { - // TODO look into better alternatives - // The issue here is that we will create dead nodes in the Vector Index - // assuming an HNSW index type. What we should do instead is invoke - // index.Remove(). However, we currently do - // not support this in VectorIndex code!! - // if a delete & dealing with vfloats, add this to dead node in persistent store. - // What we should do instead is invoke the factory.Remove(key) operation. - deadAttr := hnsw.ConcatStrings(info.edge.Attr, hnsw.VecDead) - deadKey := x.DataKey(deadAttr, 1) - pl, err := txn.Get(deadKey) + if len(info.factorySpecs) > 0 { + inKey := x.DataKey(info.edge.Attr, uid) + pl, err := txn.Get(inKey) + if err != nil { + return []*pb.DirectedEdge{}, err + } + data, err := pl.AllValues(txn.StartTs) if err != nil { return []*pb.DirectedEdge{}, err } - var deadNodes []uint64 - deadData, _ := pl.Value(txn.StartTs) - if deadData.Value == nil { - deadNodes = append(deadNodes, uid) - } else { - deadNodes, err = hnsw.ParseEdges(string(deadData.Value.([]byte))) + + if info.op == pb.DirectedEdge_DEL && + len(data) > 0 && data[0].Tid == types.VFloatID { + // TODO look into better alternatives + // The issue here is that we will create dead nodes in the Vector Index + // assuming an HNSW index type. What we should do instead is invoke + // index.Remove(). However, we currently do + // not support this in VectorIndex code!! + // if a delete & dealing with vfloats, add this to dead node in persistent store. + // What we should do instead is invoke the factory.Remove(key) operation. + deadAttr := hnsw.ConcatStrings(info.edge.Attr, hnsw.VecDead) + deadKey := x.DataKey(deadAttr, 1) + pl, err := txn.Get(deadKey) if err != nil { return []*pb.DirectedEdge{}, err } - deadNodes = append(deadNodes, uid) - } - deadNodesBytes, marshalErr := json.Marshal(deadNodes) - if marshalErr != nil { - return []*pb.DirectedEdge{}, marshalErr - } - edge := &pb.DirectedEdge{ - Entity: 1, - Attr: deadAttr, - Value: deadNodesBytes, - ValueType: pb.Posting_ValType(0), - } - if err := pl.addMutation(ctx, txn, edge); err != nil { - return nil, err + var deadNodes []uint64 + deadData, _ := pl.Value(txn.StartTs) + if deadData.Value == nil { + deadNodes = append(deadNodes, uid) + } else { + deadNodes, err = hnsw.ParseEdges(string(deadData.Value.([]byte))) + if err != nil { + return []*pb.DirectedEdge{}, err + } + deadNodes = append(deadNodes, uid) + } + deadNodesBytes, marshalErr := json.Marshal(deadNodes) + if marshalErr != nil { + return []*pb.DirectedEdge{}, marshalErr + } + edge := &pb.DirectedEdge{ + Entity: 1, + Attr: deadAttr, + Value: deadNodesBytes, + ValueType: pb.Posting_ValType(0), + } + if err := pl.addMutation(ctx, txn, edge); err != nil { + return nil, err + } } - } - // TODO: As stated earlier, we need to validate that it is okay to assume - // that we care about just data[0]. - // Similarly, the current assumption is that we have at most one - // Vector Index, but this assumption may break later. - if info.op == pb.DirectedEdge_SET && - len(data) > 0 && data[0].Tid == types.VFloatID && - len(info.factorySpecs) > 0 { - // retrieve vector from inUuid save as inVec - inVec := types.BytesAsFloatArray(data[0].Value.([]byte)) - tc := hnsw.NewTxnCache(NewViTxn(txn), txn.StartTs) - indexer, err := info.factorySpecs[0].CreateIndex(attr) - if err != nil { - return []*pb.DirectedEdge{}, err - } - edges, err := indexer.Insert(ctx, tc, uid, inVec) - if err != nil { - return []*pb.DirectedEdge{}, err - } - pbEdges := []*pb.DirectedEdge{} - for _, e := range edges { - pbe := indexEdgeToPbEdge(e) - pbEdges = append(pbEdges, pbe) + // TODO: As stated earlier, we need to validate that it is okay to assume + // that we care about just data[0]. + // Similarly, the current assumption is that we have at most one + // Vector Index, but this assumption may break later. + if info.op == pb.DirectedEdge_SET && + len(data) > 0 && data[0].Tid == types.VFloatID && + len(info.factorySpecs) > 0 { + // retrieve vector from inUuid save as inVec + inVec := types.BytesAsFloatArray(data[0].Value.([]byte)) + tc := hnsw.NewTxnCache(NewViTxn(txn), txn.StartTs) + indexer, err := info.factorySpecs[0].CreateIndex(attr) + if err != nil { + return []*pb.DirectedEdge{}, err + } + edges, err := indexer.Insert(ctx, tc, uid, inVec) + if err != nil { + return []*pb.DirectedEdge{}, err + } + pbEdges := []*pb.DirectedEdge{} + for _, e := range edges { + pbe := indexEdgeToPbEdge(e) + pbEdges = append(pbEdges, pbe) + } + return pbEdges, nil } - return pbEdges, nil } + tokens, err := indexTokens(ctx, info) if err != nil { // This data is not indexable diff --git a/worker/sort_test.go b/worker/sort_test.go index 82000e61e43..8b6255a5813 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -17,11 +17,67 @@ package worker import ( + "context" + "fmt" + "os" "testing" + "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/dgraph/v24/posting" + "github.com/dgraph-io/dgraph/v24/protos/pb" + "github.com/dgraph-io/dgraph/v24/schema" + "github.com/dgraph-io/dgraph/v24/x" "github.com/stretchr/testify/require" ) +func BenchmarkAddMutationWithIndex(b *testing.B) { + gr = new(groupi) + gr.gid = 1 + gr.tablets = make(map[string]*pb.Tablet) + addTablets := func(attrs []string, gid uint32, namespace uint64) { + for _, attr := range attrs { + gr.tablets[x.NamespaceAttr(namespace, attr)] = &pb.Tablet{GroupId: gid} + } + } + + addTablets([]string{"name", "name2", "age", "http://www.w3.org/2000/01/rdf-schema#range", "", + "friend", "dgraph.type", "dgraph.graphql.xid", "dgraph.graphql.schema"}, + 1, x.GalaxyNamespace) + addTablets([]string{"friend_not_served"}, 2, x.GalaxyNamespace) + addTablets([]string{"name"}, 1, 0x2) + + dir, err := os.MkdirTemp("", "storetest_") + x.Check(err) + defer os.RemoveAll(dir) + + opt := badger.DefaultOptions(dir) + ps, err := badger.OpenManaged(opt) + x.Check(err) + pstore = ps + // Not using posting list cache + posting.Init(ps, 0) + Init(ps) + err = schema.ParseBytes([]byte("benchmarkadd: string @index(term) ."), 1) + fmt.Println(err) + if err != nil { + panic(err) + } + ctx := context.Background() + txn := posting.Oracle().RegisterStartTs(5) + attr := x.GalaxyAttr("benchmarkadd") + + for i := 0; i < b.N; i++ { + edge := &pb.DirectedEdge{ + Value: []byte("david"), + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + } + + x.Check(runMutation(ctx, edge, txn)) + } +} + func TestRemoveDuplicates(t *testing.T) { toSet := func(uids []uint64) map[uint64]struct{} { m := make(map[uint64]struct{})