diff --git a/posting/index.go b/posting/index.go index c9eaecf2a56..38d3e4c6094 100644 --- a/posting/index.go +++ b/posting/index.go @@ -1511,11 +1511,13 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error { // DeleteAll deletes all entries in the posting list. func DeleteAll() error { + ResetCache() return pstore.DropAll() } // DeleteData deletes all data for the namespace but leaves types and schema intact. func DeleteData(ns uint64) error { + ResetCache() prefix := make([]byte, 9) prefix[0] = x.DefaultPrefix binary.BigEndian.PutUint64(prefix[1:], ns) @@ -1525,6 +1527,7 @@ func DeleteData(ns uint64) error { // DeletePredicate deletes all entries and indices for a given predicate. func DeletePredicate(ctx context.Context, attr string, ts uint64) error { glog.Infof("Dropping predicate: [%s]", attr) + ResetCache() preds := schema.State().PredicatesToDelete(attr) for _, pred := range preds { prefix := x.PredicatePrefix(pred) @@ -1541,6 +1544,8 @@ func DeletePredicate(ctx context.Context, attr string, ts uint64) error { // DeleteNamespace bans the namespace and deletes its predicates/types from the schema. func DeleteNamespace(ns uint64) error { + // TODO: We should only delete cache for certain keys, not all the keys. + ResetCache() schema.State().DeletePredsForNs(ns) return pstore.BanNamespace(ns) } diff --git a/posting/list.go b/posting/list.go index 39bd6e56e21..8c86562ad96 100644 --- a/posting/list.go +++ b/posting/list.go @@ -557,6 +557,27 @@ func (l *List) getMutation(startTs uint64) []byte { return nil } +func (l *List) setMutationAfterCommit(startTs, commitTs uint64, data []byte) { + pl := new(pb.PostingList) + x.Check(pl.Unmarshal(data)) + pl.CommitTs = commitTs + for _, p := range pl.Postings { + p.CommitTs = commitTs + } + + x.AssertTrue(pl.Pack == nil) + + l.Lock() + if l.mutationMap == nil { + l.mutationMap = make(map[uint64]*pb.PostingList) + } + l.mutationMap[startTs] = pl + if pl.CommitTs != 0 { + l.maxTs = x.Max(l.maxTs, pl.CommitTs) + } + l.Unlock() +} + func (l *List) setMutation(startTs uint64, data []byte) { pl := new(pb.PostingList) x.Check(pl.Unmarshal(data)) @@ -566,6 +587,9 @@ func (l *List) setMutation(startTs uint64, data []byte) { l.mutationMap = make(map[uint64]*pb.PostingList) } l.mutationMap[startTs] = pl + if pl.CommitTs != 0 { + l.maxTs = x.Max(l.maxTs, pl.CommitTs) + } l.Unlock() } @@ -783,6 +807,38 @@ func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) { return count == 0, nil } +func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) { + l.AssertRLock() + + dec := codec.Decoder{Pack: l.plist.Pack} + uids := dec.Seek(uid, codec.SeekStart) + length := codec.ExactLen(l.plist.Pack) + found := len(uids) > 0 && uids[0] == uid + + for _, plist := range l.mutationMap { + for _, mpost := range plist.Postings { + if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) { + if hasDeleteAll(mpost) { + found = false + length = 0 + continue + } + if mpost.Uid == uid { + found = (mpost.Op == Set) + } + if mpost.Op == Set { + length += 1 + } else { + length -= 1 + } + + } + } + } + + return length, found, nil +} + func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) { l.AssertRLock() var count int @@ -816,38 +872,6 @@ func (l *List) length(readTs, afterUid uint64) int { return count } -func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) { - l.AssertRLock() - - dec := codec.Decoder{Pack: l.plist.Pack} - uids := dec.Seek(uid, codec.SeekStart) - length := codec.ExactLen(l.plist.Pack) - found1 := len(uids) > 0 && uids[0] == uid - - for _, plist := range l.mutationMap { - for _, mpost := range plist.Postings { - if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) { - if hasDeleteAll(mpost) { - found1 = false - length = 0 - continue - } - if mpost.Uid == uid { - found1 = (mpost.Op == Set) - } - if mpost.Op == Set { - length += 1 - } else { - length -= 1 - } - - } - } - } - - return length, found1, nil -} - // Length iterates over the mutation layer and counts number of elements. func (l *List) Length(readTs, afterUid uint64) int { l.RLock() @@ -1183,6 +1207,8 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) { } if len(out.plist.Splits) > 0 || len(l.mutationMap) > 0 { + // In case there were splits, this would read all the splits from + // Badger. if err := l.encode(out, readTs, split); err != nil { return nil, errors.Wrapf(err, "while encoding") } diff --git a/posting/list_test.go b/posting/list_test.go index 5256938e5d1..45c0d963262 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -42,6 +42,21 @@ func setMaxListSize(newMaxListSize int) { maxListSize = newMaxListSize } +func readPostingListFromDisk(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + txn := pstore.NewTransactionAt(readTs, false) + defer txn.Discard() + + // When we do rollups, an older version would go to the top of the LSM tree, which can cause + // issues during txn.Get. Therefore, always iterate. + iterOpts := badger.DefaultIteratorOptions + iterOpts.AllVersions = true + iterOpts.PrefetchValues = false + itr := txn.NewKeyIterator(key, iterOpts) + defer itr.Close() + itr.Seek(key) + return ReadPostingList(key, itr) +} + func (l *List) PostingList() *pb.PostingList { l.RLock() defer l.RUnlock() @@ -177,7 +192,7 @@ func checkValue(t *testing.T, ol *List, val string, readTs uint64) { // TODO(txn): Add tests after lru eviction func TestAddMutation_Value(t *testing.T) { key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 10) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) edge := &pb.DirectedEdge{ Value: []byte("oh hey there"), @@ -440,7 +455,7 @@ func TestRollupMaxTsIsSet(t *testing.T) { maxListSize = math.MaxInt32 key := x.DataKey(x.GalaxyAttr("bal"), 1333) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) var commits int N := int(1e6) @@ -461,7 +476,7 @@ func TestRollupMaxTsIsSet(t *testing.T) { } require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } commits++ @@ -474,7 +489,7 @@ func TestMillion(t *testing.T) { maxListSize = math.MaxInt32 key := x.DataKey(x.GalaxyAttr("bal"), 1331) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) var commits int N := int(1e6) @@ -492,7 +507,7 @@ func TestMillion(t *testing.T) { kvs, err := ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } commits++ @@ -512,7 +527,7 @@ func TestMillion(t *testing.T) { func TestAddMutation_mrjn2(t *testing.T) { ctx := context.Background() key := x.DataKey(x.GalaxyAttr("bal"), 1001) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) var readTs uint64 for readTs = 1; readTs < 10; readTs++ { @@ -587,7 +602,7 @@ func TestAddMutation_mrjn2(t *testing.T) { func TestAddMutation_gru(t *testing.T) { key := x.DataKey(x.GalaxyAttr("question.tag"), 0x01) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) { @@ -620,7 +635,7 @@ func TestAddMutation_gru(t *testing.T) { func TestAddMutation_gru2(t *testing.T) { key := x.DataKey(x.GalaxyAttr("question.tag"), 0x100) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) { @@ -667,7 +682,7 @@ func TestAddAndDelMutation(t *testing.T) { // Ensure each test uses unique key since we don't clear the postings // after each test key := x.DataKey(x.GalaxyAttr("dummy_key"), 0x927) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) { @@ -695,7 +710,7 @@ func TestAddAndDelMutation(t *testing.T) { func TestAfterUIDCount(t *testing.T) { key := x.DataKey(x.GalaxyAttr("value"), 22) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) // Set value to cars and merge to BadgerDB. edge := &pb.DirectedEdge{} @@ -766,7 +781,7 @@ func TestAfterUIDCount(t *testing.T) { func TestAfterUIDCount2(t *testing.T) { key := x.DataKey(x.GalaxyAttr("value"), 23) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) // Set value to cars and merge to BadgerDB. @@ -793,7 +808,7 @@ func TestAfterUIDCount2(t *testing.T) { func TestDelete(t *testing.T) { key := x.DataKey(x.GalaxyAttr("value"), 25) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) // Set value to cars and merge to BadgerDB. @@ -815,7 +830,7 @@ func TestDelete(t *testing.T) { func TestAfterUIDCountWithCommit(t *testing.T) { key := x.DataKey(x.GalaxyAttr("value"), 26) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) // Set value to cars and merge to BadgerDB. @@ -906,7 +921,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) { maxListSize = 5000 key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) commits := 0 for i := 1; i <= size; i++ { @@ -926,7 +941,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) { kvs, err := ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } commits++ @@ -938,7 +953,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) { require.Equal(t, uint64(size+1), kv.Version) } require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) require.Nil(t, ol.plist.Pack) require.Equal(t, 0, len(ol.plist.Postings)) @@ -954,7 +969,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) { maxListSize = 10000 key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) commits := 0 for i := 1; i <= size; i++ { @@ -969,7 +984,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) { kvs, err := ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } commits++ @@ -990,7 +1005,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) { kvs, err := ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } commits++ @@ -1002,7 +1017,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) { func TestLargePlistSplit(t *testing.T) { key := x.DataKey(uuid.New().String(), 1331) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) b := make([]byte, 30<<20) _, _ = rand.Read(b) @@ -1019,7 +1034,7 @@ func TestLargePlistSplit(t *testing.T) { _, err = ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) b = make([]byte, 10<<20) _, _ = rand.Read(b) @@ -1038,7 +1053,7 @@ func TestLargePlistSplit(t *testing.T) { kvs, err := ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) // require.Nil(t, ol.plist.Bitmap) require.Equal(t, 0, len(ol.plist.Postings)) @@ -1113,7 +1128,7 @@ func TestBinSplit(t *testing.T) { maxListSize = originalListSize }() key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) for i := 1; i <= size; i++ { edge := &pb.DirectedEdge{ @@ -1131,7 +1146,7 @@ func TestBinSplit(t *testing.T) { require.Equal(t, uint64(size+1), kv.Version) } require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) require.Equal(t, 0, len(ol.plist.Splits)) require.Equal(t, size, len(ol.plist.Postings)) @@ -1245,7 +1260,7 @@ func TestMultiPartListWriteToDisk(t *testing.T) { require.Equal(t, len(kvs), len(originalList.plist.Splits)+1) require.NoError(t, writePostingListToDisk(kvs)) - newList, err := getNew(kvs[0].Key, ps, math.MaxUint64) + newList, err := readPostingListFromDisk(kvs[0].Key, ps, math.MaxUint64) require.NoError(t, err) opt := ListOptions{ReadTs: uint64(size) + 1} @@ -1294,7 +1309,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { // Add entries to the maps. key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) for i := 1; i <= size; i++ { edge := &pb.DirectedEdge{ @@ -1308,7 +1323,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { kvs, err := ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } } @@ -1335,7 +1350,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { kvs, err := ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } } @@ -1344,7 +1359,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { kvs, err := ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) for _, kv := range kvs { require.Equal(t, baseStartTs+uint64(1+size/2), kv.Version) @@ -1372,7 +1387,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { kvs, err := ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } } @@ -1381,7 +1396,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { kvs, err = ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) // Verify all entries are once again in the list. @@ -1435,7 +1450,7 @@ func TestRecursiveSplits(t *testing.T) { // Create a list that should be split recursively. size := int(1e5) key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331) - ol, err := getNew(key, ps, math.MaxUint64) + ol, err := readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) commits := 0 for i := 1; i <= size; i++ { @@ -1457,7 +1472,7 @@ func TestRecursiveSplits(t *testing.T) { kvs, err := ol.Rollup(nil, math.MaxUint64) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) require.True(t, len(ol.plist.Splits) > 2) @@ -1496,7 +1511,7 @@ func TestMain(m *testing.M) { func BenchmarkAddMutations(b *testing.B) { key := x.DataKey(x.GalaxyAttr("name"), 1) - l, err := getNew(key, ps, math.MaxUint64) + l, err := readPostingListFromDisk(key, ps, math.MaxUint64) if err != nil { b.Error(err) } diff --git a/posting/lists.go b/posting/lists.go index e813ebfd303..f00995ef8f2 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -49,10 +49,12 @@ func Init(ps *badger.DB, cacheSize int64) { pstore = ps closer = z.NewCloser(1) go x.MonitorMemoryMetrics(closer) + // Initialize cache. if cacheSize == 0 { return } + var err error lCache, err = ristretto.NewCache(&ristretto.Config{ // Use 5% of cache memory for storing counters. @@ -61,11 +63,7 @@ func Init(ps *badger.DB, cacheSize int64) { BufferItems: 64, Metrics: true, Cost: func(val interface{}) int64 { - l, ok := val.(*List) - if !ok { - return int64(0) - } - return int64(l.DeepSize()) + return 0 }, }) x.Check(err) @@ -101,7 +99,8 @@ func GetNoStore(key []byte, readTs uint64) (rlist *List, err error) { type LocalCache struct { sync.RWMutex - startTs uint64 + startTs uint64 + commitTs uint64 // The keys for these maps is a string representation of the Badger key for the posting list. // deltas keep track of the updates made by txn. These must be kept around until written to disk @@ -169,6 +168,12 @@ func NoCache(startTs uint64) *LocalCache { return &LocalCache{startTs: startTs} } +func (lc *LocalCache) UpdateCommitTs(commitTs uint64) { + lc.Lock() + defer lc.Unlock() + lc.commitTs = commitTs +} + func (lc *LocalCache) Find(pred []byte, filter func([]byte) bool) (uint64, error) { txn := pstore.NewTransactionAt(lc.startTs, false) defer txn.Discard() @@ -339,6 +344,8 @@ func (lc *LocalCache) UpdateDeltasAndDiscardLists() { } for key, pl := range lc.plists { + //pk, _ := x.Parse([]byte(key)) + //fmt.Printf("{TXN} Closing %v\n", pk) data := pl.getMutation(lc.startTs) if len(data) > 0 { lc.deltas[key] = data diff --git a/posting/mvcc.go b/posting/mvcc.go index c052422ff6e..513f9912820 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -19,8 +19,8 @@ package posting import ( "bytes" "encoding/hex" - "math" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -59,6 +59,18 @@ type incrRollupi struct { closer *z.Closer } +type CachePL struct { + count int + list *List + lastUpdate uint64 +} + +type GlobalCache struct { + sync.RWMutex + + items map[string]*CachePL +} + var ( // ErrTsTooOld is returned when a transaction is too old to be applied. ErrTsTooOld = errors.Errorf("Transaction is too old") @@ -72,6 +84,8 @@ var ( IncrRollup = &incrRollupi{ priorityKeys: make([]*pooledKeys, 2), } + + globalCache = &GlobalCache{items: make(map[string]*CachePL, 100)} ) func init() { @@ -109,7 +123,7 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error { } } - l, err := GetNoStore(key, math.MaxUint64) + l, err := GetNoStore(key, ts) if err != nil { return err } @@ -118,9 +132,18 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error { if err != nil { return err } - // Clear the list from the cache after a rollup. - RemoveCacheFor(key) + globalCache.Lock() + val, ok := globalCache.items[string(key)] + if ok { + val.list = nil + } + globalCache.Unlock() + // TODO Update cache with rolled up results + // If we do a rollup, we typically won't need to update the key in cache. + // The only caveat is that the key written by rollup would be written at +1 + // timestamp, hence bumping the latest TS for the key by 1. The cache should + // understand that. const N = uint64(1000) if glog.V(2) { if count := atomic.AddUint64(&ir.count, 1); count%N == 0 { @@ -172,8 +195,8 @@ func (ir *incrRollupi) Process(closer *z.Closer, getNewTs func(bool) uint64) { currTs := time.Now().Unix() for _, key := range *batch { hash := z.MemHash(key) - if elem := m[hash]; currTs-elem >= 10 { - // Key not present or Key present but last roll up was more than 10 sec ago. + if elem := m[hash]; currTs-elem >= 2 { + // Key not present or Key present but last roll up was more than 2 sec ago. // Add/Update map and rollup. m[hash] = currTs if err := ir.rollUpKey(writer, key); err != nil { @@ -320,30 +343,57 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { return nil } -// ResetCache will clear all the cached list. func ResetCache() { + globalCache.Lock() + globalCache.items = make(map[string]*CachePL) + globalCache.Unlock() lCache.Clear() } -// RemoveCacheFor will delete the list corresponding to the given key. -func RemoveCacheFor(key []byte) { - // TODO: investigate if this can be done by calling Set with a nil value. - lCache.Del(key) +func NewCachePL() *CachePL { + return &CachePL{ + count: 0, + list: nil, + lastUpdate: 0, + } } // RemoveCachedKeys will delete the cached list by this txn. -func (txn *Txn) RemoveCachedKeys() { +func (txn *Txn) UpdateCachedKeys(commitTs uint64) { if txn == nil || txn.cache == nil { return } - for key := range txn.cache.deltas { - lCache.Del(key) - } -} -func WaitForCache() { - // TODO Investigate if this is needed and why Jepsen tests fail with the cache enabled. - // lCache.Wait() + for key, delta := range txn.cache.deltas { + pk, _ := x.Parse([]byte(key)) + if !ShouldGoInCache(pk) { + continue + } + globalCache.Lock() + val, ok := globalCache.items[key] + if !ok { + val = NewCachePL() + val.lastUpdate = commitTs + globalCache.items[key] = val + } + if commitTs != 0 { + // TODO Delete this if the values are too old in an async thread + val.lastUpdate = commitTs + } + if !ok { + globalCache.Unlock() + continue + } + + val.count -= 1 + + if commitTs != 0 && val.list != nil { + p := new(pb.PostingList) + x.Check(p.Unmarshal(delta)) + val.list.setMutationAfterCommit(txn.StartTs, commitTs, delta) + } + globalCache.Unlock() + } } func unmarshalOrCopy(plist *pb.PostingList, item *badger.Item) error { @@ -462,33 +512,63 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return l, nil } -func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { - cachedVal, ok := lCache.Get(key) - if ok { - l, ok := cachedVal.(*List) - if ok && l != nil { - // No need to clone the immutable layer or the key since mutations will not modify it. - lCopy := &List{ - minTs: l.minTs, - maxTs: l.maxTs, - key: key, - plist: l.plist, - } - l.RLock() - if l.mutationMap != nil { - lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap)) - for ts, pl := range l.mutationMap { - lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList) - } - } - l.RUnlock() - return lCopy, nil - } +func copyList(l *List) *List { + l.AssertRLock() + // No need to clone the immutable layer or the key since mutations will not modify it. + lCopy := &List{ + minTs: l.minTs, + maxTs: l.maxTs, + key: l.key, + plist: l.plist, } + lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap)) + for k, v := range l.mutationMap { + lCopy.mutationMap[k] = proto.Clone(v).(*pb.PostingList) + } + return lCopy +} +func (c *CachePL) Set(l *List, readTs uint64) { + if c.lastUpdate < readTs && (c.list == nil || c.list.maxTs < l.maxTs) { + c.list = l + } +} + +func ShouldGoInCache(pk x.ParsedKey) bool { + return !pk.IsData() && strings.HasSuffix(pk.Attr, "dgraph.type") +} + +func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { if pstore.IsClosed() { return nil, badger.ErrDBClosed } + + pk, _ := x.Parse(key) + + if ShouldGoInCache(pk) { + globalCache.Lock() + cacheItem, ok := globalCache.items[string(key)] + if !ok { + cacheItem = NewCachePL() + globalCache.items[string(key)] = cacheItem + } + cacheItem.count += 1 + + // We use badger subscription to invalidate the cache. For every write we make the value + // corresponding to the key in the cache to nil. So, if we get some non-nil value from the cache + // then it means that no writes have happened after the last set of this key in the cache. + if ok { + if cacheItem.list != nil && cacheItem.list.minTs <= readTs { + cacheItem.list.RLock() + lCopy := copyList(cacheItem.list) + cacheItem.list.RUnlock() + globalCache.Unlock() + return lCopy, nil + } + } + globalCache.Unlock() + } + txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard() @@ -504,6 +584,26 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { if err != nil { return l, err } - lCache.Set(key, l, 0) + + // Only set l to the cache if readTs >= latestTs, which implies that l is + // the latest version of the PL. We also check that we're reading a version + // from Badger, which is higher than the write registered by the cache. + if ShouldGoInCache(pk) { + globalCache.Lock() + l.RLock() + cacheItem, ok := globalCache.items[string(key)] + if !ok { + cacheItemNew := NewCachePL() + cacheItemNew.count = 1 + cacheItemNew.list = copyList(l) + cacheItemNew.lastUpdate = l.maxTs + globalCache.items[string(key)] = cacheItemNew + } else { + cacheItem.Set(copyList(l), readTs) + } + l.RUnlock() + globalCache.Unlock() + } + return l, nil } diff --git a/posting/mvcc_test.go b/posting/mvcc_test.go index ad0ffa9564b..9061d61d359 100644 --- a/posting/mvcc_test.go +++ b/posting/mvcc_test.go @@ -61,6 +61,42 @@ func TestIncrRollupGetsCancelledQuickly(t *testing.T) { } } +func TestCacheAfterDeltaUpdateRecieved(t *testing.T) { + attr := x.GalaxyAttr("cache") + key := x.IndexKey(attr, "temp") + + // Create a delta from 5->15. Mimick how a follower recieves a delta. + p := new(pb.PostingList) + p.Postings = []*pb.Posting{{ + Uid: 1, + StartTs: 5, + CommitTs: 15, + Op: 1, + }} + delta, err := p.Marshal() + require.NoError(t, err) + + // Write delta to disk and call update + txn := Oracle().RegisterStartTs(5) + txn.cache.deltas[string(key)] = delta + + writer := NewTxnWriter(pstore) + require.NoError(t, txn.CommitToDisk(writer, 15)) + require.NoError(t, writer.Flush()) + + txn.UpdateCachedKeys(15) + + // Read key at timestamp 10. Make sure cache is not updated by this, as there is a later read. + l, err := GetNoStore(key, 10) + require.NoError(t, err) + require.Equal(t, len(l.mutationMap), 0) + + // Read at 20 should show the value + l1, err := GetNoStore(key, 20) + require.NoError(t, err) + require.Equal(t, len(l1.mutationMap), 1) +} + func TestRollupTimestamp(t *testing.T) { attr := x.GalaxyAttr("rollup") key := x.DataKey(attr, 1) @@ -79,8 +115,9 @@ func TestRollupTimestamp(t *testing.T) { edge := &pb.DirectedEdge{ Entity: 1, Attr: attr, - Value: []byte(x.Star), - Op: pb.DirectedEdge_DEL, + + Value: []byte(x.Star), + Op: pb.DirectedEdge_DEL, } addMutation(t, l, edge, Del, 9, 10, false) diff --git a/posting/oracle.go b/posting/oracle.go index dcc310ab8f7..57cf6f317b9 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -290,8 +290,14 @@ func (o *oracle) ProcessDelta(delta *pb.OracleDelta) { o.Lock() defer o.Unlock() - for _, txn := range delta.Txns { - delete(o.pendingTxns, txn.StartTs) + for _, status := range delta.Txns { + txn := o.pendingTxns[status.StartTs] + if txn != nil && status.CommitTs > 0 { + for k := range txn.cache.deltas { + IncrRollup.addKeyToBatch([]byte(k), 0) + } + } + delete(o.pendingTxns, status.StartTs) } curMax := o.MaxAssigned() if delta.MaxAssigned < curMax { diff --git a/worker/draft.go b/worker/draft.go index 0cd1359ed63..6ed5d97489c 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -843,7 +843,7 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { writer := posting.NewTxnWriter(pstore) toDisk := func(start, commit uint64) { txn := posting.Oracle().GetTxn(start) - if txn == nil { + if txn == nil || commit == 0 { return } // If the transaction has failed, we dont need to update it. @@ -875,6 +875,7 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { if err := writer.Flush(); err != nil { return errors.Wrapf(err, "while flushing to disk") } + if x.WorkerConfig.HardSync { if err := pstore.Sync(); err != nil { glog.Errorf("Error while calling Sync while commitOrAbort: %v", err) @@ -889,9 +890,8 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { // Clear all the cached lists that were touched by this transaction. for _, status := range delta.Txns { txn := posting.Oracle().GetTxn(status.StartTs) - txn.RemoveCachedKeys() + txn.UpdateCachedKeys(status.CommitTs) } - posting.WaitForCache() // Now advance Oracle(), so we can service waiting reads. posting.Oracle().ProcessDelta(delta) diff --git a/worker/server_state.go b/worker/server_state.go index 38d00573a68..9cb256c7acd 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -52,7 +52,7 @@ const ( ZeroLimitsDefaults = `uid-lease=0; refill-interval=30s; disable-admin-http=false;` GraphQLDefaults = `introspection=true; debug=false; extensions=true; poll-interval=1s; ` + `lambda-url=;` - CacheDefaults = `size-mb=1024; percentage=0,65,35;` + CacheDefaults = `size-mb=1024; percentage=0,80,20;` FeatureFlagsDefaults = `normalize-compatibility-mode=` ) diff --git a/worker/snapshot.go b/worker/snapshot.go index 311804c4ea5..676821a4051 100644 --- a/worker/snapshot.go +++ b/worker/snapshot.go @@ -114,6 +114,8 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error { if err := deleteStalePreds(ctx, done, snap.ReadTs); err != nil { return err } + // Reset the cache after having received a snapshot. + posting.ResetCache() glog.Infof("Snapshot writes DONE. Sending ACK") // Send an acknowledgement back to the leader. diff --git a/x/keys.go b/x/keys.go index 9160cc384bb..1b0986fd0be 100644 --- a/x/keys.go +++ b/x/keys.go @@ -303,7 +303,7 @@ type ParsedKey struct { func (p ParsedKey) String() string { if p.IsIndex() { - return fmt.Sprintf("UID: %v, Attr: %v, IsIndex: true, Term: %v", p.Uid, p.Attr, p.Count) + return fmt.Sprintf("UID: %v, Attr: %v, IsIndex: true, Term: %v", p.Uid, p.Attr, []byte(p.Term)) } else if p.IsCountOrCountRev() { return fmt.Sprintf("UID: %v, Attr: %v, IsCount/Ref: true, Count: %v", p.Uid, p.Attr, p.Count) } else {