Skip to content

Commit

Permalink
add single key call
Browse files Browse the repository at this point in the history
Currently namespace used by unique generator uses edge predicate to
validate. This shouldn't be the case, we should use the namespace
provided in context.
  • Loading branch information
Harshil goel authored and harshil-goel committed Aug 3, 2024
1 parent 8e79c04 commit 873fddf
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 35 deletions.
48 changes: 48 additions & 0 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,54 @@ func TestAddMutation_mrjn1(t *testing.T) {
require.Equal(t, 0, ol.Length(txn.StartTs, 0))
}

func TestReadSingleValue(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32

// We call pl.Iterate and then stop iterating in the first loop when we are reading
// single values. This test confirms that the two functions, getFirst from this file
// and GetSingeValueForKey works without an issue.

key := x.DataKey(x.GalaxyAttr("value"), 1240)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
N := int(10000)
for i := 2; i <= N; i += 2 {
edge := &pb.DirectedEdge{
Value: []byte("ho hey there" + strconv.Itoa(i)),
}
txn := Txn{StartTs: uint64(i)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
kData := ol.getMutation(uint64(i))
writer := NewTxnWriter(pstore)
if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil {
require.NoError(t, err)
}
writer.Flush()

if i%10 == 0 {
// Do frequent rollups, and store data in old timestamp
kvs, err := ol.Rollup(nil, txn.StartTs-3)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}

j := 2
if j < int(ol.minTs) {
j = int(ol.minTs)
}
for ; j < i+6; j++ {
tx := NewTxn(uint64(j))
k, err := tx.cache.GetSinglePosting(key)
require.NoError(t, err)
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
}
}
}

func TestRollupMaxTsIsSet(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32
Expand Down
71 changes: 71 additions & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,77 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
return lc.SetIfAbsent(skey, pl), nil
}

// GetSinglePosting retrieves the cached version of the first item in the list associated with the
// given key. This is used for retrieving the value of a scalar predicats.
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
getList := func() *pb.PostingList {
lc.RLock()
defer lc.RUnlock()

pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := pl.Unmarshal(delta)
if err == nil {
return pl
}
}

l := lc.plists[string(key)]
if l != nil {
// If the current transaction is updating it, read it from here.
// Otherwise read it from disk. TODO see if this can be fixed.
return l.mutationMap[lc.startTs]
}

return nil
}

getPostings := func() (*pb.PostingList, error) {
pl := getList()
if pl != nil {
return pl, nil
}

pl = &pb.PostingList{}
txn := pstore.NewTransactionAt(lc.startTs, false)
item, err := txn.Get(key)
if err != nil {
return nil, err
}

err = item.Value(func(val []byte) error {
if err := pl.Unmarshal(val); err != nil {
return err
}
return nil
})

return pl, err
}

pl, err := getPostings()
if err == badger.ErrKeyNotFound {
return nil, nil
}
if err != nil {
return nil, err
}

// Filter and remove STAR_ALL and OP_DELETE Postings
idx := 0
for _, postings := range pl.Postings {
if hasDeleteAll(postings) {
return nil, nil
}
if postings.Op != Del {
pl.Postings[idx] = postings
idx++
}
}
pl.Postings = pl.Postings[:idx]
return pl, nil
}

// Get retrieves the cached version of the list associated with the given key.
func (lc *LocalCache) Get(key []byte) (*List, error) {
return lc.getInternal(key, true)
Expand Down
104 changes: 69 additions & 35 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,14 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
outputs := make([]*pb.Result, numGo)
listType := schema.State().IsList(q.Attr)

// These are certain special cases where we can get away with reading only the latest value
// Lang doesn't work because we would be storing various different languages at various
// time. So when we go to read the latest value, we might get a different language.
// Similarly with DoCount and ExpandAll and Facets. List types are also not supported
// because list is stored by time, and we combine all the list items at various timestamps.
hasLang := schema.State().HasLang(q.Attr)
getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil

calculate := func(start, end int) error {
x.AssertTrue(start%width == 0)
out := &pb.Result{}
Expand All @@ -434,49 +442,75 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
key := x.DataKey(q.Attr, q.UidList.Uids[i])

// Get or create the posting list for an entity, attribute combination.
pl, err := qs.cache.Get(key)
if err != nil {
return err
}

// If count is being requested, there is no need to populate value and facets matrix.
if q.DoCount {
count, err := countForValuePostings(args, pl, facetsTree, listType)
if err != nil && err != posting.ErrNoValue {
var vals []types.Val
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored

if !getMultiplePosting {
pl, err := qs.cache.GetSinglePosting(key)
if err != nil {
return err
}
if pl == nil || len(pl.Postings) == 0 {
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
continue
}
vals = make([]types.Val, len(pl.Postings))
for i, p := range pl.Postings {
vals[i] = types.Val{
Tid: types.TypeID(p.ValType),
Value: p.Value,
}
}
} else {
pl, err := qs.cache.Get(key)
if err != nil {
return err
}
out.Counts = append(out.Counts, uint32(count))
// Add an empty UID list to make later processing consistent.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
continue
}

vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType)
switch {
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
// This branch is taken when the value does not exist in the pl or
// the number of values retrieved is zero (there could still be facets).
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
// LangMatrix so that all these data structure have predictable layouts.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
// If count is being requested, there is no need to populate value and facets matrix.
if q.DoCount {
count, err := countForValuePostings(args, pl, facetsTree, listType)
if err != nil && err != posting.ErrNoValue {
return err
}
out.Counts = append(out.Counts, uint32(count))
// Add an empty UID list to make later processing consistent.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
continue
}
continue
case err != nil:
return err
}

if q.ExpandAll {
langTags, err := pl.GetLangTags(args.q.ReadTs)
if err != nil {
vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType)

switch {
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
// This branch is taken when the value does not exist in the pl or
// the number of values retrieved is zero (there could still be facets).
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
// LangMatrix so that all these data structure have predictable layouts.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
}
continue
case err != nil:
return err
}
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})

if q.ExpandAll {
langTags, err := pl.GetLangTags(args.q.ReadTs)
if err != nil {
return err
}
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
}
}

uidList := new(pb.List)
Expand Down

0 comments on commit 873fddf

Please sign in to comment.