diff --git a/posting/list.go b/posting/list.go index b55521294f1..8c86562ad96 100644 --- a/posting/list.go +++ b/posting/list.go @@ -679,11 +679,6 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e // mposts is the list of mutable postings deleteBelowTs, mposts := l.pickPostings(readTs) - return l.iterateInternal(readTs, afterUid, f, deleteBelowTs, mposts) -} - -func (l *List) iterateInternal(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error, deleteBelowTs uint64, - mposts []*pb.Posting) error { if readTs < l.minTs { return errors.Errorf("readTs: %d less than minTs: %d for key: %q", readTs, l.minTs, l.key) } @@ -818,18 +813,18 @@ func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, boo dec := codec.Decoder{Pack: l.plist.Pack} uids := dec.Seek(uid, codec.SeekStart) length := codec.ExactLen(l.plist.Pack) - found1 := len(uids) > 0 && uids[0] == uid + found := len(uids) > 0 && uids[0] == uid for _, plist := range l.mutationMap { for _, mpost := range plist.Postings { if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) { if hasDeleteAll(mpost) { - found1 = false + found = false length = 0 continue } if mpost.Uid == uid { - found1 = (mpost.Op == Set) + found = (mpost.Op == Set) } if mpost.Op == Set { length += 1 @@ -841,7 +836,7 @@ func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, boo } } - return length, found1, nil + return length, found, nil } func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) { @@ -877,38 +872,6 @@ func (l *List) length(readTs, afterUid uint64) int { return count } -func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) { - l.AssertRLock() - - dec := codec.Decoder{Pack: l.plist.Pack} - uids := dec.Seek(uid, codec.SeekStart) - length := codec.ExactLen(l.plist.Pack) - found1 := len(uids) > 0 && uids[0] == uid - - for _, plist := range l.mutationMap { - for _, mpost := range plist.Postings { - if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) { - if hasDeleteAll(mpost) { - found1 = false - length = 0 - continue - } - if mpost.Uid == uid { - found1 = (mpost.Op == Set) - } - if mpost.Op == Set { - length += 1 - } else { - length -= 1 - } - - } - } - } - - return length, found1, nil -} - // Length iterates over the mutation layer and counts number of elements. func (l *List) Length(readTs, afterUid uint64) int { l.RLock() diff --git a/posting/mvcc.go b/posting/mvcc.go index afd1bb2517c..61186140cbc 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -58,12 +58,16 @@ type incrRollupi struct { closer *z.Closer } +type CachePL struct { + count int + list *List + lastUpdate uint64 +} + type GlobalCache struct { sync.RWMutex - count map[string]int - list map[string]*List - lastUpdate map[string]uint64 + items map[string]*CachePL } var ( @@ -80,11 +84,7 @@ var ( priorityKeys: make([]*pooledKeys, 2), } - globalCache = &GlobalCache{ - count: make(map[string]int), - list: make(map[string]*List), - lastUpdate: make(map[string]uint64), - } + globalCache = &GlobalCache{items: make(map[string]*CachePL)} ) func init() { @@ -338,9 +338,7 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { func ResetCache() { globalCache.Lock() - globalCache.count = make(map[string]int) - globalCache.list = make(map[string]*List) - globalCache.lastUpdate = make(map[string]uint64) + globalCache.items = make(map[string]*CachePL) globalCache.Unlock() lCache.Clear() } @@ -353,34 +351,38 @@ func (txn *Txn) UpdateCachedKeys(commitTs uint64) { for key, delta := range txn.cache.deltas { pk, _ := x.Parse([]byte(key)) - if pk.IsData() { + if !ShouldGoInCache(pk) { continue } globalCache.Lock() + val, ok := globalCache.items[key] + if !ok { + val = &CachePL{ + count: 0, + list: nil, + lastUpdate: commitTs, + } + globalCache.items[key] = val + } if commitTs != 0 { // TODO Delete this if the values are too old in an async thread - globalCache.lastUpdate[key] = commitTs + val.lastUpdate = commitTs } - val, ok := globalCache.count[key] if !ok { globalCache.Unlock() continue } - globalCache.count[key] = val - 1 - if val == 1 { - delete(globalCache.count, key) - delete(globalCache.list, key) + val.count -= 1 + if val.count == 1 { + val.list = nil globalCache.Unlock() continue } - if commitTs != 0 { - pl, ok := globalCache.list[key] - if ok { - p := new(pb.PostingList) - x.Check(p.Unmarshal(delta)) - pl.setMutationAfterCommit(txn.StartTs, commitTs, delta) - } + if commitTs != 0 && val.list != nil { + p := new(pb.PostingList) + x.Check(p.Unmarshal(delta)) + val.list.setMutationAfterCommit(txn.StartTs, commitTs, delta) } globalCache.Unlock() } @@ -518,28 +520,50 @@ func copyList(l *List) *List { return lCopy } +func (c *CachePL) Set(l *List, readTs uint64) { + if c.lastUpdate < readTs && (c.list == nil || c.list.maxTs < l.maxTs) { + c.list = l + } +} + +func ShouldGoInCache(pk x.ParsedKey) bool { + return !pk.IsData() +} + func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { if pstore.IsClosed() { return nil, badger.ErrDBClosed } pk, _ := x.Parse(key) - globalCache.Lock() - globalCache.count[string(key)]++ - - // We use badger subscription to invalidate the cache. For every write we make the value - // corresponding to the key in the cache to nil. So, if we get some non-nil value from the cache - // then it means that no writes have happened after the last set of this key in the cache. - if l, ok := globalCache.list[string(key)]; ok { - if l != nil && l.minTs <= readTs { - l.RLock() - lCopy := copyList(l) - l.RUnlock() - globalCache.Unlock() - return lCopy, nil + + if ShouldGoInCache(pk) { + globalCache.Lock() + cacheItem, ok := globalCache.items[string(key)] + if !ok { + cacheItem = &CachePL{ + count: 0, + list: nil, + lastUpdate: 0, + } + globalCache.items[string(key)] = cacheItem + } + cacheItem.count += 1 + + // We use badger subscription to invalidate the cache. For every write we make the value + // corresponding to the key in the cache to nil. So, if we get some non-nil value from the cache + // then it means that no writes have happened after the last set of this key in the cache. + if ok { + if cacheItem.list != nil && cacheItem.list.minTs <= readTs { + cacheItem.list.RLock() + lCopy := copyList(cacheItem.list) + cacheItem.list.RUnlock() + globalCache.Unlock() + return lCopy, nil + } } + globalCache.Unlock() } - globalCache.Unlock() txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard() @@ -560,14 +584,19 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { // Only set l to the cache if readTs >= latestTs, which implies that l is // the latest version of the PL. We also check that we're reading a version // from Badger, which is higher than the write registered by the cache. - if !pk.IsData() && readTs >= l.maxTs { + if ShouldGoInCache(pk) { globalCache.Lock() l.RLock() - cacheList, ok := globalCache.list[string(key)] - if !ok || (ok && cacheList.maxTs < l.maxTs) { - if lastUpdateTs, k := globalCache.lastUpdate[string(key)]; !k || (k && lastUpdateTs < readTs) { - globalCache.list[string(key)] = copyList(l) + cacheItem, ok := globalCache.items[string(key)] + if !ok { + cacheItemNew := &CachePL{ + count: 1, + list: copyList(l), + lastUpdate: l.maxTs, } + globalCache.items[string(key)] = cacheItemNew + } else { + cacheItem.Set(copyList(l), readTs) } l.RUnlock() globalCache.Unlock()