diff --git a/posting/list.go b/posting/list.go index bd5caf12b08..45527884d25 100644 --- a/posting/list.go +++ b/posting/list.go @@ -107,14 +107,6 @@ func newMutableMap() *MutableMap { } } -func (mm *MutableMap) getDeleteMarker() uint64 { - if mm == nil { - return 0 - } - - return mm.deleteMarker -} - func (mm *MutableMap) clone() *MutableMap { if mm == nil { return nil @@ -234,9 +226,9 @@ func (mm *MutableMap) _iterate(f func(ts uint64, pl *pb.PostingList)) { // Before iterating, we have to figure out where the last delete marker is // Then gather the posts that would be above the marker -func (mm *MutableMap) iterate(f func(ts uint64, pl *pb.PostingList), readTs uint64) { +func (mm *MutableMap) iterate(f func(ts uint64, pl *pb.PostingList), readTs uint64) uint64 { if mm == nil { - return + return 0 } deleteMarker := mm.populateDeleteAll(readTs) @@ -245,6 +237,7 @@ func (mm *MutableMap) iterate(f func(ts uint64, pl *pb.PostingList), readTs uint f(ts, pl) } }) + return deleteMarker } func (mm *MutableMap) insertOldPosting(pl *pb.PostingList) { @@ -863,7 +856,7 @@ func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) { // First pick up the postings. var posts []*pb.Posting - l.mutationMap.iterate(func(ts uint64, plist *pb.PostingList) { + deleteMarker := l.mutationMap.iterate(func(ts uint64, plist *pb.PostingList) { // ts will be plist.CommitTs for commited transactions // ts will be readTs for mutations that are me posts = append(posts, plist.Postings...) @@ -880,7 +873,7 @@ func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) { } return pi.Uid < pj.Uid }) - return l.mutationMap.getDeleteMarker(), posts + return deleteMarker, posts } func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error { diff --git a/posting/list_test.go b/posting/list_test.go index f312b096228..0de875c2727 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -493,7 +493,7 @@ func TestAddMutation_mrjn1(t *testing.T) { addMutationHelper(t, ol, edge, Set, txn) checkValue(t, ol, "cars", txn.StartTs) - // Delete it again, just for fun. + // delete it again, just for fun. edge = &pb.DirectedEdge{ Value: []byte("cars"), } @@ -1542,6 +1542,32 @@ func TestSingleListRollup(t *testing.T) { // TODO: Need more testing here. } +func TestListDeleteMarker(t *testing.T) { + defer setMaxListSize(maxListSize) + maxListSize = mb / 2 + + key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 13345) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) + require.NoError(t, err) + + txn := Txn{StartTs: uint64(1)} + edge := &pb.DirectedEdge{ + Entity: 1, + ValueId: 1, + } + addMutationHelper(t, ol, edge, Set, &txn) + require.NoError(t, ol.commitMutation(1, 2)) + + txn = Txn{StartTs: uint64(3)} + edge = &pb.DirectedEdge{ + Value: []byte(x.Star), + Op: pb.DirectedEdge_DEL, + } + addMutationHelper(t, ol, edge, Del, &txn) + + require.Equal(t, ol.mutationMap.populateDeleteAll(3), uint64(3)) +} + func TestRecursiveSplits(t *testing.T) { // For testing, set the max list size to a lower threshold. defer setMaxListSize(maxListSize) diff --git a/worker/sort_test.go b/worker/sort_test.go index a60200f96d8..cdaeef61a82 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -73,14 +73,11 @@ func TestSingleUid(t *testing.T) { ps, err := badger.OpenManaged(opt) x.Check(err) pstore = ps - // Not using posting list cache posting.Init(ps, 0) Init(ps) err = schema.ParseBytes([]byte("singleUidTest: string @index(exact) @unique ."), 1) - fmt.Println(err) - if err != nil { - panic(err) - } + require.NoError(t, err) + ctx := context.Background() txn := posting.Oracle().RegisterStartTs(5) attr := x.GalaxyAttr("singleUidTest") @@ -160,6 +157,7 @@ func TestSingleUid(t *testing.T) { found, mpost, err = l.FindPosting(18, 1) require.NoError(t, err) require.Equal(t, found, false) + require.Equal(t, mpost, nil) } func TestLangExact(t *testing.T) { @@ -201,16 +199,21 @@ func TestLangExact(t *testing.T) { x.Check(runMutation(ctx, edge, txn)) - pl, err := txn.Get(x.DataKey(attr, 1)) - x.Check(err) + txn.Update() + writer := posting.NewTxnWriter(pstore) + require.NoError(t, txn.CommitToDisk(writer, 2)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(2) - fmt.Println(pl.Value(5)) + key := x.DataKey(attr, 1) + rollup(t, key, pstore, 4) - err = pl.Iterate(5, 0, func(p *pb.Posting) error { - fmt.Println("INSIDE MAIN ITERATE", p) - return nil - }) - x.Check(err) + pl, err := readPostingListFromDisk(key, pstore, 6) + require.NoError(t, err) + + val, err := pl.ValueForTag(6, "hi") + require.NoError(t, err) + require.Equal(t, val.Value, []byte("hindi")) } func BenchmarkAddMutationWithIndex(b *testing.B) {