From bf70930c7086d5ae83543d2ca9e4c2ba044cc423 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Fri, 8 Nov 2024 20:57:20 -0500 Subject: [PATCH] cache,db: de-dup concurrent attempts to read the same block Concurrent reads of the same block have been observed to cause very high memory usage, and cause significant CPU usage for allocations/deallocations. We now coordinate across multiple concurrent attempts to read the same block via a readEntry, which makes the readers take turns until one succeeds. The readEntries are embedded in a map that is part of a readShard, where there is a readShard for each cache.Shard. See the long comment in the readShard declaration for motivation. The Options.LoadBlockSema is integrated into the readEntry, to simplify the waiting logic in the caller. Callers interact with this new behavior via Cache.GetWithReadHandle, which is only for callers that intend to do a read and then populate the cache. If this method returns a ReadHandle, the caller has permission to do a read. See the ReadHandle comment for details of the contract. Fixes #4138 --- internal/cache/clockpro.go | 87 ++++++- internal/cache/read_shard.go | 404 +++++++++++++++++++++++++++++ internal/cache/refcnt_normal.go | 13 + internal/cache/refcnt_tracing.go | 4 + sstable/block/buffer_pool.go | 15 +- sstable/colblk/index_block_test.go | 2 +- sstable/colblk/keyspan_test.go | 2 +- sstable/reader.go | 86 ++++-- 8 files changed, 584 insertions(+), 29 deletions(-) create mode 100644 internal/cache/read_shard.go diff --git a/internal/cache/clockpro.go b/internal/cache/clockpro.go index 2459ec7b69..665ead3f8b 100644 --- a/internal/cache/clockpro.go +++ b/internal/cache/clockpro.go @@ -18,6 +18,7 @@ package cache // import "github.com/cockroachdb/pebble/internal/cache" import ( + "context" "fmt" "os" "runtime" @@ -25,6 +26,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" @@ -114,9 +116,18 @@ type shard struct { countHot int64 countCold int64 countTest int64 + + // Some fields in readShard are protected by mu. See comments in declaration + // of readShard. + readShard readShard } -func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { +// GetWithMaybeReadEntry is the internal helper for implementing +// Cache.{Get,GetWithReadHandle}. When desireReadEntry is true, and the block +// is not in the cache (!Handle.Valid()), a non-nil readEntry is returned. +func (c *shard) GetWithMaybeReadEntry( + id ID, fileNum base.DiskFileNum, offset uint64, desireReadEntry bool, +) (Handle, *readEntry) { c.mu.RLock() var value *Value if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { @@ -126,12 +137,30 @@ func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { } } c.mu.RUnlock() + var re *readEntry + if value == nil && desireReadEntry { + c.mu.Lock() + // After the c.mu.RUnlock(), someone could have inserted the value in the + // cache. We could tolerate the race and do a file read, or do another map + // lookup. We choose to do the latter, since the cost of a map lookup is + // insignificant compared to the cost of reading a block from a file. + if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { + value = e.acquireValue() + if value != nil { + e.referenced.Store(true) + } + } + if value == nil { + re = c.readShard.getReadEntryLocked(id, fileNum, offset) + } + c.mu.Unlock() + } if value == nil { c.misses.Add(1) - return Handle{} + } else { + c.hits.Add(1) } - c.hits.Add(1) - return Handle{value: value} + return Handle{value: value}, re } func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value) Handle { @@ -170,6 +199,11 @@ func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value value.ref.trace("add-hot") c.sizeHot += delta } else { + // TODO(sumeer): unclear why we don't set e.ptype to etHot on this path. + // In the default case below, where the state is etTest we set it to + // etHot. But etTest is "colder" than etCold, since the only transition + // into etTest is etCold => etTest, so since etTest transitions to + // etHot, then etCold should also transition. value.ref.trace("add-cold") c.sizeCold += delta } @@ -746,6 +780,7 @@ func newShards(size int64, shards int) *Cache { } c.shards[i].blocks.Init(16) c.shards[i].files.Init(16) + c.shards[i].readShard.Init(&c.shards[i]) } // Note: this is a no-op if invariants are disabled or race is enabled. @@ -822,7 +857,49 @@ func (c *Cache) Unref() { // Get retrieves the cache value for the specified file and offset, returning // nil if no value is present. func (c *Cache) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { - return c.getShard(id, fileNum, offset).Get(id, fileNum, offset) + h, re := c.getShard(id, fileNum, offset).GetWithMaybeReadEntry( + id, fileNum, offset, false) + if invariants.Enabled && re != nil { + panic("readEntry should be nil") + } + return h +} + +// GetWithReadHandle retrieves the cache value for the specified ID, fileNum +// and offset. If found, a valid Handle is returned (with cacheHit set to +// true), else a valid ReadHandle is returned. +// +// See the ReadHandle declaration for the contract the caller must satisfy +// when getting a valid ReadHandle. +// +// This method can block before returning since multiple concurrent gets for +// the same cache value will take turns getting a ReadHandle, which represents +// permission to do the read. This blocking respects context cancellation, in +// which case an error is returned (and not a valid ReadHandle). +// +// When blocking, the errorDuration return value can be non-zero and is +// populated with the total duration that other readers that observed an error +// (see ReadHandle.SetReadError) spent in doing the read. This duration can be +// greater than the time spent blocked in this method, since some of these +// errors could have occurred prior to this call. But it serves as a rough +// indicator of whether turn taking could have caused higher latency due to +// context cancellation of other readers. +// +// While waiting, someone else may successfully read the value, which results +// in a valid Handle being returned. This is a case where cacheHit=false. +func (c *Cache) GetWithReadHandle( + ctx context.Context, id ID, fileNum base.DiskFileNum, offset uint64, +) (h Handle, rh ReadHandle, errorDuration time.Duration, cacheHit bool, err error) { + h, re := c.getShard(id, fileNum, offset).GetWithMaybeReadEntry( + id, fileNum, offset, true) + if h.Valid() { + return h, ReadHandle{}, 0, true, nil + } + h, errorDuration, err = re.waitForReadPermissionOrHandle(ctx) + if err != nil || h.Valid() { + return h, ReadHandle{}, errorDuration, false, err + } + return Handle{}, ReadHandle{entry: re}, errorDuration, false, nil } // Set sets the cache value for the specified file and offset, overwriting an diff --git a/internal/cache/read_shard.go b/internal/cache/read_shard.go new file mode 100644 index 0000000000..ef43e618fb --- /dev/null +++ b/internal/cache/read_shard.go @@ -0,0 +1,404 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/swiss" +) + +// readShard coordinates the read of a block that will be put in the cache. It +// ensures only one goroutine is reading a block, and other callers block +// until that goroutine is done (with success or failure). In the case of +// success, the other goroutines will use the value that was read, even if it +// is too large to be placed in the cache, or got evicted from the cache +// before they got scheduled. In the case of a failure (read error or context +// cancellation), one of the waiters will be given a turn to do the read. +// +// This turn-taking ensures that a large number of concurrent attempts to read +// the same block that is not in the cache does not result in the same number +// of reads from the filesystem (or remote storage). We have seen large spikes +// in memory usage (and CPU usage for memory allocation/deallocation) without +// this turn-taking. +// +// It introduces a small risk related to context cancellation -- if many +// readers assigned a turn exceed their deadline while doing the read and +// report an error, a reader with a longer deadline can unnecessarily wait. We +// accept this risk for now since the primary production use in CockroachDB is +// filesystem reads, where context cancellation is not respected. We do +// introduce an error duration metric emitted in traces that can be used to +// quantify such wasteful waiting. Note that this same risk extends to waiting +// on the Options.LoadBlockSema, so the error duration metric includes the +// case of an error when waiting on the semaphore (as a side effect of that +// waiting happening in the caller, sstable.Reader). +// +// Design choices and motivation: +// +// - readShard is tightly integrated with a cache shard: At its core, +// readShard is a map with synchronization. For the same reason the cache is +// sharded (for higher concurrency by sharding the mutex), it is beneficial +// to shard synchronization on readShard. By making readShard a member of +// shard, this sharding is trivially accomplished. Additionally, the code +// feels cleaner when there isn't a race between a cache miss, followed by +// creating a readEntry that is no longer needed because someone else has +// done the read since the miss and inserted into the cache. By making the +// readShard use shard.mu, such a race is avoided. A side benefit is that +// the cache interaction can be hidden behind readEntry.SetReadValue. One +// disadvantage of this tightly integrated design is that it does not +// encompass readers that will put the read value into a block.BufferPool -- +// we don't worry about those since block.BufferPool is only used for +// compactions and there is at most one compaction reader of a block. There +// is the possibility that the compaction reader and a user-facing iterator +// reader will do duplicate reads, but we accept that deficiency. +// +// - readMap is separate from shard.blocks map: One could have a design which +// extends the cache entry and unifies the two maps. However, we never want +// to evict a readEntry while there are readers waiting for the block read +// (including the case where the corresponding file is being removed from +// shard.files). Also, the number of stable cache entries is huge and +// therefore is manually allocated, while the number of readEntries is small +// (so manual allocation isn't necessary). For these reasons we maintain a +// separate map. This separation also results in more modular code, instead +// of piling more stuff into shard. +type readShard struct { + shard *shard + // Protected by shard.mu. + // + // shard.mu is never held when acquiring readEntry.mu. shard.mu is a shared + // resource and must be released quickly. + shardMu struct { + readMap swiss.Map[key, *readEntry] + } +} + +func (rs *readShard) Init(shard *shard) *readShard { + *rs = readShard{ + shard: shard, + } + // Choice of 16 is arbitrary. + rs.shardMu.readMap.Init(16) + return rs +} + +// getReadEntryLocked gets a *readEntry for (id, fileNum, offset). shard.mu is +// already write locked. +func (rs *readShard) getReadEntryLocked(id ID, fileNum base.DiskFileNum, offset uint64) *readEntry { + k := key{fileKey{id, fileNum}, offset} + e, ok := rs.shardMu.readMap.Get(k) + if !ok { + e = newReadEntry(rs, id, fileNum, offset) + rs.shardMu.readMap.Put(k, e) + } else { + e.refCount.acquireAllowZero() + } + return e +} + +// readEntry is used to coordinate between concurrent attempted readers of the +// same block. +type readEntry struct { + readShard *readShard + id ID + fileNum base.DiskFileNum + offset uint64 + mu struct { + sync.RWMutex + // v, when non-nil, has a ref from readEntry, which is unreffed when + // readEntry is deleted from the readMap. + v *Value + // isReading and ch together capture the state of whether someone has been + // granted a turn to read, and of readers waiting for that read to finish. + // ch is lazily allocated since most readEntries will not see concurrent + // readers. This lazy allocation results in one transition of ch from nil + // to non-nil, so waiters can read this non-nil ch and block on reading + // from it without holding mu. + // + // ch is written to, to signal one waiter to start doing the read. ch is + // closed when the value is successfully read and has been stored in v, so + // that all waiters wake up and read v. ch is a buffered channel with a + // capacity of 1. + // + // State transitions when trying to wait for turn: + // Case !isReading: + // set isReading=true; Drain the ch if non-nil and non-empty; proceed + // with turn to do the read. + // Case isReading: + // allocate ch if nil; wait on ch + // Finished reading successfully: + // set isReading=false; if ch is non-nil, close ch. + // Finished reading with failure: + // set isReading=false; if ch is non-nil, write to ch. + // + // INVARIANT: + // isReading => ch is nil or ch is empty. + isReading bool + ch chan struct{} + // Total duration of reads and semaphore waiting that resulted in error. + errorDuration time.Duration + readStart time.Time + } + // Count of ReadHandles that refer to this readEntry. Increments always hold + // shard.mu. So if this is found to be 0 while holding shard.mu, it is safe + // to delete readEntry from readShard.shardMu.readMap. + refCount refcnt +} + +var readEntryPool = sync.Pool{ + New: func() interface{} { + return &readEntry{} + }, +} + +func newReadEntry(rs *readShard, id ID, fileNum base.DiskFileNum, offset uint64) *readEntry { + e := readEntryPool.Get().(*readEntry) + *e = readEntry{ + readShard: rs, + id: id, + fileNum: fileNum, + offset: offset, + } + e.refCount.init(1) + return e +} + +// waitForReadPermissionOrHandle returns either an already read value (in +// Handle), an error (if the context was cancelled), or neither, which is a +// directive to the caller to do the read. In this last case the caller must +// call either setReadValue or setReadError. +// +// In all cases, errorDuration is populated with the total duration that +// readers that observed an error (setReadError) spent in doing the read. This +// duration can be greater than the time spend in waitForReadPermissionHandle, +// since some of these errors could have occurred prior to this call. But it +// serves as a rough indicator of whether turn taking could have caused higher +// latency due to context cancellation. +func (e *readEntry) waitForReadPermissionOrHandle( + ctx context.Context, +) (h Handle, errorDuration time.Duration, err error) { + constructHandleLocked := func() Handle { + if e.mu.v == nil { + panic("value is nil") + } + e.mu.v.acquire() + return Handle{value: e.mu.v} + } + becomeReaderLocked := func() { + if e.mu.v != nil { + panic("value is non-nil") + } + e.mu.isReading = true + if e.mu.ch != nil { + // Drain the channel, so that no one else mistakenly believes they + // should read. + select { + case <-e.mu.ch: + default: + } + } + e.mu.readStart = time.Now() + } + unlockAndUnrefAndTryRemoveFromMap := func(readLock bool) (errorDuration time.Duration) { + removeState := e.makeTryRemoveStateLocked() + errorDuration = e.mu.errorDuration + if readLock { + e.mu.RUnlock() + } else { + e.mu.Unlock() + } + unrefAndTryRemoveFromMap(removeState) + return errorDuration + } + e.mu.Lock() + if e.mu.v != nil { + // Value has already been read. + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return h, errorDuration, nil + } + // Not already read. Wait for turn to do the read or for someone else to do + // the read. + if !e.mu.isReading { + // Have permission to do the read. + becomeReaderLocked() + errorDuration = e.mu.errorDuration + e.mu.Unlock() + return Handle{}, errorDuration, nil + } + if e.mu.ch == nil { + // Rare case when multiple readers are concurrently trying to read. If + // this turns out to be common enough we could use a sync.Pool. + e.mu.ch = make(chan struct{}, 1) + } + ch := e.mu.ch + e.mu.Unlock() + select { + case <-ctx.Done(): + e.mu.Lock() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return Handle{}, errorDuration, ctx.Err() + case _, ok := <-ch: + if ok { + // Granted permission to do the read. + e.mu.Lock() + becomeReaderLocked() + errorDuration = e.mu.errorDuration + e.mu.Unlock() + return Handle{}, errorDuration, nil + } else { + // Channel closed, so value was read. + e.mu.RLock() + if e.mu.v == nil { + panic("value is nil") + } + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(true) + return h, errorDuration, nil + } + } +} + +// tryRemoveState captures the state needed by tryRemoveFromMap. The caller +// constructs it before calling unrefAndTryRemoveFromMap since it typically +// held readEntry.mu, which avoids acquiring it again in +// unrefAndTryRemoveFromMap. +type tryRemoveState struct { + rs *readShard + k key + e *readEntry +} + +// makeTryRemoveStateLocked initializes tryRemoveState. +func (e *readEntry) makeTryRemoveStateLocked() tryRemoveState { + return tryRemoveState{ + rs: e.readShard, + k: key{fileKey{e.id, e.fileNum}, e.offset}, + e: e, + } +} + +// unrefAndTryRemoveFromMap tries to remove s.k => s.e from the map in s.rs. +// It is possible that after unreffing that s.e has already been removed, and +// is now back in the sync.Pool, or being reused (for the same or different +// key). This is because after unreffing, which caused the s.e.refCount to +// become zero, but before acquiring shard.mu, it could have been incremented +// and decremented concurrently, and some other goroutine could have observed +// a different decrement to 0, and raced ahead and deleted s.e from the +// readMap. +func unrefAndTryRemoveFromMap(s tryRemoveState) { + if !s.e.refCount.release() { + return + } + s.rs.shard.mu.Lock() + e2, ok := s.rs.shardMu.readMap.Get(s.k) + if !ok || e2 != s.e { + // Already removed. + s.rs.shard.mu.Unlock() + return + } + if s.e.refCount.value() != 0 { + s.rs.shard.mu.Unlock() + return + } + // k => e and e.refCount == 0. And it cannot be incremented since + // shard.mu.Lock() is held. So remove from map. + s.rs.shardMu.readMap.Delete(s.k) + s.rs.shard.mu.Unlock() + + // Free s.e. + s.e.mu.Lock() + if s.e.mu.v != nil { + s.e.mu.v.release() + s.e.mu.v = nil + } + s.e.mu.Unlock() + *s.e = readEntry{} + readEntryPool.Put(s.e) +} + +func (e *readEntry) setReadValue(v *Value) Handle { + // Add to the cache before taking another ref for readEntry, since the cache + // expects ref=1 when it is called. + // + // TODO(sumeer): if e.refCount > 1, we should consider overriding to ensure + // that it is added as etHot. The common case will be e.refCount = 1, and we + // don't want to acquire e.mu twice, so one way to do this would be relax + // the invariant in shard.Set that requires Value.refs() == 1. Then we can + // do the work under e.mu before calling shard.Set. + h := e.readShard.shard.Set(e.id, e.fileNum, e.offset, v) + e.mu.Lock() + // Acquire a ref for readEntry, since we are going to remember it in e.mu.v. + v.acquire() + e.mu.v = v + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + // Inform all waiters so they can use e.mu.v. Not all readers have called + // readEntry.waitForReadPermissionOrHandle, and those will also use + // e.mu.v. + close(e.mu.ch) + } + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) + return h +} + +func (e *readEntry) setReadError(err error) { + e.mu.Lock() + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + select { + case e.mu.ch <- struct{}{}: + default: + panic("channel is not empty") + } + } + e.mu.errorDuration += time.Since(e.mu.readStart) + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) +} + +// ReadHandle represents a contract with a caller that had a miss when doing a +// cache lookup, and wants to do a read and insert the read block into the +// cache. The contract applies when ReadHandle.Valid returns true, in which +// case the caller has been assigned the turn to do the read (and others are +// potentially waiting for it). +// +// Contract: +// +// The caller must immediately start doing a read, or can first wait on a +// shared resource that would also block a different reader if it was assigned +// the turn instead (specifically, this refers to Options.LoadBlockSema). +// After the read, it must either call SetReadValue or SetReadError depending +// on whether the read succeeded or failed. +type ReadHandle struct { + entry *readEntry +} + +// Valid returns true for a valid ReadHandle. +func (rh ReadHandle) Valid() bool { + return rh.entry != nil +} + +// SetReadValue provides the Value that the caller has read. The caller is +// responsible for releasing the returned Handle when it is no longer needed. +func (rh ReadHandle) SetReadValue(v *Value) Handle { + return rh.entry.setReadValue(v) +} + +// SetReadError specifies that the caller has encountered a read error. +func (rh ReadHandle) SetReadError(err error) { + rh.entry.setReadError(err) +} diff --git a/internal/cache/refcnt_normal.go b/internal/cache/refcnt_normal.go index 9ab3348613..94beb232f0 100644 --- a/internal/cache/refcnt_normal.go +++ b/internal/cache/refcnt_normal.go @@ -37,6 +37,15 @@ func (v *refcnt) acquire() { } } +// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be +// called with a zero refcnt. This is useful for cases where the entry which +// is being reference counted is inside a container and the container does not +// hold a reference. The container uses release() returning true to attempt to +// do a cleanup from the map. +func (v *refcnt) acquireAllowZero() { + v.val.Add(1) +} + func (v *refcnt) release() bool { switch v := v.val.Add(-1); { case v < 0: @@ -48,6 +57,10 @@ func (v *refcnt) release() bool { } } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { } diff --git a/internal/cache/refcnt_tracing.go b/internal/cache/refcnt_tracing.go index 1d5e6c0219..a0329c69a7 100644 --- a/internal/cache/refcnt_tracing.go +++ b/internal/cache/refcnt_tracing.go @@ -51,6 +51,10 @@ func (v *refcnt) release() bool { return n == 0 } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { s := fmt.Sprintf("%s: refs=%d\n%s", msg, v.refs(), debug.Stack()) v.Lock() diff --git a/sstable/block/buffer_pool.go b/sstable/block/buffer_pool.go index 38809b41ed..80bf550cd0 100644 --- a/sstable/block/buffer_pool.go +++ b/sstable/block/buffer_pool.go @@ -8,6 +8,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" + "github.com/cockroachdb/pebble/internal/invariants" ) // Alloc allocates a new Value for a block of length n (excluding the block @@ -52,12 +53,22 @@ func (b Value) BlockMetadata() *Metadata { // backed by a buffer pool, MakeHandle inserts the value into the block cache, // returning a handle to the now resident value. func (b Value) MakeHandle( - c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, + crh cache.ReadHandle, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, ) BufferHandle { if b.buf.Valid() { + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle was valid") + } return BufferHandle{b: b.buf} } - return BufferHandle{h: c.Set(cacheID, fileNum, offset, b.v)} + return BufferHandle{h: crh.SetReadValue(b.v)} +} + +func (b Value) SetInCacheAndReleaseForTesting( + c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, +) { + h := c.Set(cacheID, fileNum, offset, b.v) + h.Release() } // Release releases the handle. diff --git a/sstable/colblk/index_block_test.go b/sstable/colblk/index_block_test.go index c7e7d6a3dc..970535455e 100644 --- a/sstable/colblk/index_block_test.go +++ b/sstable/colblk/index_block_test.go @@ -125,7 +125,7 @@ func TestIndexIterInitHandle(t *testing.T) { d := (*IndexBlockDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(blockData) - v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release() + v.SetInCacheAndReleaseForTesting(c, cache.ID(1), base.DiskFileNum(1), 0) getBlockAndIterate := func(it *IndexIter) { h := c.Get(cache.ID(1), base.DiskFileNum(1), 0) diff --git a/sstable/colblk/keyspan_test.go b/sstable/colblk/keyspan_test.go index 9592dabcc1..a306e7f192 100644 --- a/sstable/colblk/keyspan_test.go +++ b/sstable/colblk/keyspan_test.go @@ -88,7 +88,7 @@ func TestKeyspanBlockPooling(t *testing.T) { copy(v.BlockData(), b) d := (*KeyspanDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(v.BlockData()) - v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release() + v.SetInCacheAndReleaseForTesting(c, cache.ID(1), base.DiskFileNum(1), 0) getBlockAndIterate := func() { h := c.Get(cache.ID(1), base.DiskFileNum(1), 0) diff --git a/sstable/reader.go b/sstable/reader.go index 8955b054eb..9e4aa3430f 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -499,17 +499,48 @@ func (r *Reader) readBlockInternal( bh block.Handle, initBlockMetadataFn func(*block.Metadata, []byte) error, ) (handle block.BufferHandle, _ error) { - if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Valid() { - // Cache hit. - if readHandle != nil { - readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen)) + var ch cache.Handle + var crh cache.ReadHandle + hit := true + if env.BufferPool == nil { + var errorDuration time.Duration + var err error + ch, crh, errorDuration, hit, err = r.cacheOpts.Cache.GetWithReadHandle( + ctx, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + if errorDuration > 5*time.Millisecond && r.logger.IsTracingEnabled(ctx) { + r.logger.Eventf( + ctx, "waited for turn when %s time wasted by failed reads", errorDuration.String()) + } + // TODO(sumeer): consider tracing when waited longer than some duration + // for turn to do the read. + if err != nil { + return block.BufferHandle{}, err + } + } else { + // The compaction path uses env.BufferPool, and does not coordinate read + // using a cache.ReadHandle. This is ok since only a single compaction is + // reading a block. + ch = r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + if ch.Valid() { + hit = true } - env.BlockServedFromCache(bh.Length) - return block.CacheBufferHandle(h), nil + } + // INVARIANT: hit => ch.Valid() + if ch.Valid() { + if hit { + // Cache hit. + if readHandle != nil { + readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen)) + } + env.BlockServedFromCache(bh.Length) + } + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle must not be valid") + } + return block.CacheBufferHandle(ch), nil } - // Cache miss. - + // Need to read. First acquire loadBlockSema, if needed. if sema := r.loadBlockSema; sema != nil { if err := sema.Acquire(ctx, 1); err != nil { // An error here can only come from the context. @@ -517,7 +548,26 @@ func (r *Reader) readBlockInternal( } defer sema.Release(1) } + value, err := r.doRead(ctx, env, readHandle, bh, initBlockMetadataFn) + if err != nil { + if crh.Valid() { + crh.SetReadError(err) + } + return block.BufferHandle{}, err + } + h := value.MakeHandle(crh, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + return h, nil +} +// doRead is a helper for readBlockInternal that does the read, checksum +// check, decompression, and returns either a block.Value or an error. +func (r *Reader) doRead( + ctx context.Context, + env readBlockEnv, + readHandle objstorage.ReadHandle, + bh block.Handle, + initBlockMetadataFn func(*block.Metadata, []byte) error, +) (block.Value, error) { compressed := block.Alloc(int(bh.Length+block.TrailerLen), env.BufferPool) readStopwatch := makeStopwatch() var err error @@ -540,17 +590,15 @@ func (r *Reader) readBlockInternal( } if err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } env.BlockRead(bh.Length, readDuration) - if err := checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { + if err = checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - typ := block.CompressionIndicator(compressed.BlockData()[bh.Length]) compressed.Truncate(int(bh.Length)) - var decompressed block.Value if typ == block.NoCompressionIndicator { decompressed = compressed @@ -559,23 +607,21 @@ func (r *Reader) readBlockInternal( decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.BlockData()) if err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - decompressed = block.Alloc(decodedLen, env.BufferPool) err = block.DecompressInto(typ, compressed.BlockData()[prefixLen:], decompressed.BlockData()) compressed.Release() if err != nil { decompressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } } - if err := initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { + if err = initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { decompressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) - return h, nil + return decompressed, nil } func (r *Reader) readMetaindex(