diff --git a/internal/cache/clockpro.go b/internal/cache/clockpro.go index 2459ec7b69..665ead3f8b 100644 --- a/internal/cache/clockpro.go +++ b/internal/cache/clockpro.go @@ -18,6 +18,7 @@ package cache // import "github.com/cockroachdb/pebble/internal/cache" import ( + "context" "fmt" "os" "runtime" @@ -25,6 +26,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" @@ -114,9 +116,18 @@ type shard struct { countHot int64 countCold int64 countTest int64 + + // Some fields in readShard are protected by mu. See comments in declaration + // of readShard. + readShard readShard } -func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { +// GetWithMaybeReadEntry is the internal helper for implementing +// Cache.{Get,GetWithReadHandle}. When desireReadEntry is true, and the block +// is not in the cache (!Handle.Valid()), a non-nil readEntry is returned. +func (c *shard) GetWithMaybeReadEntry( + id ID, fileNum base.DiskFileNum, offset uint64, desireReadEntry bool, +) (Handle, *readEntry) { c.mu.RLock() var value *Value if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { @@ -126,12 +137,30 @@ func (c *shard) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { } } c.mu.RUnlock() + var re *readEntry + if value == nil && desireReadEntry { + c.mu.Lock() + // After the c.mu.RUnlock(), someone could have inserted the value in the + // cache. We could tolerate the race and do a file read, or do another map + // lookup. We choose to do the latter, since the cost of a map lookup is + // insignificant compared to the cost of reading a block from a file. + if e, _ := c.blocks.Get(key{fileKey{id, fileNum}, offset}); e != nil { + value = e.acquireValue() + if value != nil { + e.referenced.Store(true) + } + } + if value == nil { + re = c.readShard.getReadEntryLocked(id, fileNum, offset) + } + c.mu.Unlock() + } if value == nil { c.misses.Add(1) - return Handle{} + } else { + c.hits.Add(1) } - c.hits.Add(1) - return Handle{value: value} + return Handle{value: value}, re } func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value) Handle { @@ -170,6 +199,11 @@ func (c *shard) Set(id ID, fileNum base.DiskFileNum, offset uint64, value *Value value.ref.trace("add-hot") c.sizeHot += delta } else { + // TODO(sumeer): unclear why we don't set e.ptype to etHot on this path. + // In the default case below, where the state is etTest we set it to + // etHot. But etTest is "colder" than etCold, since the only transition + // into etTest is etCold => etTest, so since etTest transitions to + // etHot, then etCold should also transition. value.ref.trace("add-cold") c.sizeCold += delta } @@ -746,6 +780,7 @@ func newShards(size int64, shards int) *Cache { } c.shards[i].blocks.Init(16) c.shards[i].files.Init(16) + c.shards[i].readShard.Init(&c.shards[i]) } // Note: this is a no-op if invariants are disabled or race is enabled. @@ -822,7 +857,49 @@ func (c *Cache) Unref() { // Get retrieves the cache value for the specified file and offset, returning // nil if no value is present. func (c *Cache) Get(id ID, fileNum base.DiskFileNum, offset uint64) Handle { - return c.getShard(id, fileNum, offset).Get(id, fileNum, offset) + h, re := c.getShard(id, fileNum, offset).GetWithMaybeReadEntry( + id, fileNum, offset, false) + if invariants.Enabled && re != nil { + panic("readEntry should be nil") + } + return h +} + +// GetWithReadHandle retrieves the cache value for the specified ID, fileNum +// and offset. If found, a valid Handle is returned (with cacheHit set to +// true), else a valid ReadHandle is returned. +// +// See the ReadHandle declaration for the contract the caller must satisfy +// when getting a valid ReadHandle. +// +// This method can block before returning since multiple concurrent gets for +// the same cache value will take turns getting a ReadHandle, which represents +// permission to do the read. This blocking respects context cancellation, in +// which case an error is returned (and not a valid ReadHandle). +// +// When blocking, the errorDuration return value can be non-zero and is +// populated with the total duration that other readers that observed an error +// (see ReadHandle.SetReadError) spent in doing the read. This duration can be +// greater than the time spent blocked in this method, since some of these +// errors could have occurred prior to this call. But it serves as a rough +// indicator of whether turn taking could have caused higher latency due to +// context cancellation of other readers. +// +// While waiting, someone else may successfully read the value, which results +// in a valid Handle being returned. This is a case where cacheHit=false. +func (c *Cache) GetWithReadHandle( + ctx context.Context, id ID, fileNum base.DiskFileNum, offset uint64, +) (h Handle, rh ReadHandle, errorDuration time.Duration, cacheHit bool, err error) { + h, re := c.getShard(id, fileNum, offset).GetWithMaybeReadEntry( + id, fileNum, offset, true) + if h.Valid() { + return h, ReadHandle{}, 0, true, nil + } + h, errorDuration, err = re.waitForReadPermissionOrHandle(ctx) + if err != nil || h.Valid() { + return h, ReadHandle{}, errorDuration, false, err + } + return Handle{}, ReadHandle{entry: re}, errorDuration, false, nil } // Set sets the cache value for the specified file and offset, overwriting an diff --git a/internal/cache/read_shard.go b/internal/cache/read_shard.go new file mode 100644 index 0000000000..ef43e618fb --- /dev/null +++ b/internal/cache/read_shard.go @@ -0,0 +1,404 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/swiss" +) + +// readShard coordinates the read of a block that will be put in the cache. It +// ensures only one goroutine is reading a block, and other callers block +// until that goroutine is done (with success or failure). In the case of +// success, the other goroutines will use the value that was read, even if it +// is too large to be placed in the cache, or got evicted from the cache +// before they got scheduled. In the case of a failure (read error or context +// cancellation), one of the waiters will be given a turn to do the read. +// +// This turn-taking ensures that a large number of concurrent attempts to read +// the same block that is not in the cache does not result in the same number +// of reads from the filesystem (or remote storage). We have seen large spikes +// in memory usage (and CPU usage for memory allocation/deallocation) without +// this turn-taking. +// +// It introduces a small risk related to context cancellation -- if many +// readers assigned a turn exceed their deadline while doing the read and +// report an error, a reader with a longer deadline can unnecessarily wait. We +// accept this risk for now since the primary production use in CockroachDB is +// filesystem reads, where context cancellation is not respected. We do +// introduce an error duration metric emitted in traces that can be used to +// quantify such wasteful waiting. Note that this same risk extends to waiting +// on the Options.LoadBlockSema, so the error duration metric includes the +// case of an error when waiting on the semaphore (as a side effect of that +// waiting happening in the caller, sstable.Reader). +// +// Design choices and motivation: +// +// - readShard is tightly integrated with a cache shard: At its core, +// readShard is a map with synchronization. For the same reason the cache is +// sharded (for higher concurrency by sharding the mutex), it is beneficial +// to shard synchronization on readShard. By making readShard a member of +// shard, this sharding is trivially accomplished. Additionally, the code +// feels cleaner when there isn't a race between a cache miss, followed by +// creating a readEntry that is no longer needed because someone else has +// done the read since the miss and inserted into the cache. By making the +// readShard use shard.mu, such a race is avoided. A side benefit is that +// the cache interaction can be hidden behind readEntry.SetReadValue. One +// disadvantage of this tightly integrated design is that it does not +// encompass readers that will put the read value into a block.BufferPool -- +// we don't worry about those since block.BufferPool is only used for +// compactions and there is at most one compaction reader of a block. There +// is the possibility that the compaction reader and a user-facing iterator +// reader will do duplicate reads, but we accept that deficiency. +// +// - readMap is separate from shard.blocks map: One could have a design which +// extends the cache entry and unifies the two maps. However, we never want +// to evict a readEntry while there are readers waiting for the block read +// (including the case where the corresponding file is being removed from +// shard.files). Also, the number of stable cache entries is huge and +// therefore is manually allocated, while the number of readEntries is small +// (so manual allocation isn't necessary). For these reasons we maintain a +// separate map. This separation also results in more modular code, instead +// of piling more stuff into shard. +type readShard struct { + shard *shard + // Protected by shard.mu. + // + // shard.mu is never held when acquiring readEntry.mu. shard.mu is a shared + // resource and must be released quickly. + shardMu struct { + readMap swiss.Map[key, *readEntry] + } +} + +func (rs *readShard) Init(shard *shard) *readShard { + *rs = readShard{ + shard: shard, + } + // Choice of 16 is arbitrary. + rs.shardMu.readMap.Init(16) + return rs +} + +// getReadEntryLocked gets a *readEntry for (id, fileNum, offset). shard.mu is +// already write locked. +func (rs *readShard) getReadEntryLocked(id ID, fileNum base.DiskFileNum, offset uint64) *readEntry { + k := key{fileKey{id, fileNum}, offset} + e, ok := rs.shardMu.readMap.Get(k) + if !ok { + e = newReadEntry(rs, id, fileNum, offset) + rs.shardMu.readMap.Put(k, e) + } else { + e.refCount.acquireAllowZero() + } + return e +} + +// readEntry is used to coordinate between concurrent attempted readers of the +// same block. +type readEntry struct { + readShard *readShard + id ID + fileNum base.DiskFileNum + offset uint64 + mu struct { + sync.RWMutex + // v, when non-nil, has a ref from readEntry, which is unreffed when + // readEntry is deleted from the readMap. + v *Value + // isReading and ch together capture the state of whether someone has been + // granted a turn to read, and of readers waiting for that read to finish. + // ch is lazily allocated since most readEntries will not see concurrent + // readers. This lazy allocation results in one transition of ch from nil + // to non-nil, so waiters can read this non-nil ch and block on reading + // from it without holding mu. + // + // ch is written to, to signal one waiter to start doing the read. ch is + // closed when the value is successfully read and has been stored in v, so + // that all waiters wake up and read v. ch is a buffered channel with a + // capacity of 1. + // + // State transitions when trying to wait for turn: + // Case !isReading: + // set isReading=true; Drain the ch if non-nil and non-empty; proceed + // with turn to do the read. + // Case isReading: + // allocate ch if nil; wait on ch + // Finished reading successfully: + // set isReading=false; if ch is non-nil, close ch. + // Finished reading with failure: + // set isReading=false; if ch is non-nil, write to ch. + // + // INVARIANT: + // isReading => ch is nil or ch is empty. + isReading bool + ch chan struct{} + // Total duration of reads and semaphore waiting that resulted in error. + errorDuration time.Duration + readStart time.Time + } + // Count of ReadHandles that refer to this readEntry. Increments always hold + // shard.mu. So if this is found to be 0 while holding shard.mu, it is safe + // to delete readEntry from readShard.shardMu.readMap. + refCount refcnt +} + +var readEntryPool = sync.Pool{ + New: func() interface{} { + return &readEntry{} + }, +} + +func newReadEntry(rs *readShard, id ID, fileNum base.DiskFileNum, offset uint64) *readEntry { + e := readEntryPool.Get().(*readEntry) + *e = readEntry{ + readShard: rs, + id: id, + fileNum: fileNum, + offset: offset, + } + e.refCount.init(1) + return e +} + +// waitForReadPermissionOrHandle returns either an already read value (in +// Handle), an error (if the context was cancelled), or neither, which is a +// directive to the caller to do the read. In this last case the caller must +// call either setReadValue or setReadError. +// +// In all cases, errorDuration is populated with the total duration that +// readers that observed an error (setReadError) spent in doing the read. This +// duration can be greater than the time spend in waitForReadPermissionHandle, +// since some of these errors could have occurred prior to this call. But it +// serves as a rough indicator of whether turn taking could have caused higher +// latency due to context cancellation. +func (e *readEntry) waitForReadPermissionOrHandle( + ctx context.Context, +) (h Handle, errorDuration time.Duration, err error) { + constructHandleLocked := func() Handle { + if e.mu.v == nil { + panic("value is nil") + } + e.mu.v.acquire() + return Handle{value: e.mu.v} + } + becomeReaderLocked := func() { + if e.mu.v != nil { + panic("value is non-nil") + } + e.mu.isReading = true + if e.mu.ch != nil { + // Drain the channel, so that no one else mistakenly believes they + // should read. + select { + case <-e.mu.ch: + default: + } + } + e.mu.readStart = time.Now() + } + unlockAndUnrefAndTryRemoveFromMap := func(readLock bool) (errorDuration time.Duration) { + removeState := e.makeTryRemoveStateLocked() + errorDuration = e.mu.errorDuration + if readLock { + e.mu.RUnlock() + } else { + e.mu.Unlock() + } + unrefAndTryRemoveFromMap(removeState) + return errorDuration + } + e.mu.Lock() + if e.mu.v != nil { + // Value has already been read. + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return h, errorDuration, nil + } + // Not already read. Wait for turn to do the read or for someone else to do + // the read. + if !e.mu.isReading { + // Have permission to do the read. + becomeReaderLocked() + errorDuration = e.mu.errorDuration + e.mu.Unlock() + return Handle{}, errorDuration, nil + } + if e.mu.ch == nil { + // Rare case when multiple readers are concurrently trying to read. If + // this turns out to be common enough we could use a sync.Pool. + e.mu.ch = make(chan struct{}, 1) + } + ch := e.mu.ch + e.mu.Unlock() + select { + case <-ctx.Done(): + e.mu.Lock() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(false) + return Handle{}, errorDuration, ctx.Err() + case _, ok := <-ch: + if ok { + // Granted permission to do the read. + e.mu.Lock() + becomeReaderLocked() + errorDuration = e.mu.errorDuration + e.mu.Unlock() + return Handle{}, errorDuration, nil + } else { + // Channel closed, so value was read. + e.mu.RLock() + if e.mu.v == nil { + panic("value is nil") + } + h := constructHandleLocked() + errorDuration = unlockAndUnrefAndTryRemoveFromMap(true) + return h, errorDuration, nil + } + } +} + +// tryRemoveState captures the state needed by tryRemoveFromMap. The caller +// constructs it before calling unrefAndTryRemoveFromMap since it typically +// held readEntry.mu, which avoids acquiring it again in +// unrefAndTryRemoveFromMap. +type tryRemoveState struct { + rs *readShard + k key + e *readEntry +} + +// makeTryRemoveStateLocked initializes tryRemoveState. +func (e *readEntry) makeTryRemoveStateLocked() tryRemoveState { + return tryRemoveState{ + rs: e.readShard, + k: key{fileKey{e.id, e.fileNum}, e.offset}, + e: e, + } +} + +// unrefAndTryRemoveFromMap tries to remove s.k => s.e from the map in s.rs. +// It is possible that after unreffing that s.e has already been removed, and +// is now back in the sync.Pool, or being reused (for the same or different +// key). This is because after unreffing, which caused the s.e.refCount to +// become zero, but before acquiring shard.mu, it could have been incremented +// and decremented concurrently, and some other goroutine could have observed +// a different decrement to 0, and raced ahead and deleted s.e from the +// readMap. +func unrefAndTryRemoveFromMap(s tryRemoveState) { + if !s.e.refCount.release() { + return + } + s.rs.shard.mu.Lock() + e2, ok := s.rs.shardMu.readMap.Get(s.k) + if !ok || e2 != s.e { + // Already removed. + s.rs.shard.mu.Unlock() + return + } + if s.e.refCount.value() != 0 { + s.rs.shard.mu.Unlock() + return + } + // k => e and e.refCount == 0. And it cannot be incremented since + // shard.mu.Lock() is held. So remove from map. + s.rs.shardMu.readMap.Delete(s.k) + s.rs.shard.mu.Unlock() + + // Free s.e. + s.e.mu.Lock() + if s.e.mu.v != nil { + s.e.mu.v.release() + s.e.mu.v = nil + } + s.e.mu.Unlock() + *s.e = readEntry{} + readEntryPool.Put(s.e) +} + +func (e *readEntry) setReadValue(v *Value) Handle { + // Add to the cache before taking another ref for readEntry, since the cache + // expects ref=1 when it is called. + // + // TODO(sumeer): if e.refCount > 1, we should consider overriding to ensure + // that it is added as etHot. The common case will be e.refCount = 1, and we + // don't want to acquire e.mu twice, so one way to do this would be relax + // the invariant in shard.Set that requires Value.refs() == 1. Then we can + // do the work under e.mu before calling shard.Set. + h := e.readShard.shard.Set(e.id, e.fileNum, e.offset, v) + e.mu.Lock() + // Acquire a ref for readEntry, since we are going to remember it in e.mu.v. + v.acquire() + e.mu.v = v + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + // Inform all waiters so they can use e.mu.v. Not all readers have called + // readEntry.waitForReadPermissionOrHandle, and those will also use + // e.mu.v. + close(e.mu.ch) + } + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) + return h +} + +func (e *readEntry) setReadError(err error) { + e.mu.Lock() + if !e.mu.isReading { + panic("isReading is false") + } + e.mu.isReading = false + if e.mu.ch != nil { + select { + case e.mu.ch <- struct{}{}: + default: + panic("channel is not empty") + } + } + e.mu.errorDuration += time.Since(e.mu.readStart) + removeState := e.makeTryRemoveStateLocked() + e.mu.Unlock() + unrefAndTryRemoveFromMap(removeState) +} + +// ReadHandle represents a contract with a caller that had a miss when doing a +// cache lookup, and wants to do a read and insert the read block into the +// cache. The contract applies when ReadHandle.Valid returns true, in which +// case the caller has been assigned the turn to do the read (and others are +// potentially waiting for it). +// +// Contract: +// +// The caller must immediately start doing a read, or can first wait on a +// shared resource that would also block a different reader if it was assigned +// the turn instead (specifically, this refers to Options.LoadBlockSema). +// After the read, it must either call SetReadValue or SetReadError depending +// on whether the read succeeded or failed. +type ReadHandle struct { + entry *readEntry +} + +// Valid returns true for a valid ReadHandle. +func (rh ReadHandle) Valid() bool { + return rh.entry != nil +} + +// SetReadValue provides the Value that the caller has read. The caller is +// responsible for releasing the returned Handle when it is no longer needed. +func (rh ReadHandle) SetReadValue(v *Value) Handle { + return rh.entry.setReadValue(v) +} + +// SetReadError specifies that the caller has encountered a read error. +func (rh ReadHandle) SetReadError(err error) { + rh.entry.setReadError(err) +} diff --git a/internal/cache/refcnt_normal.go b/internal/cache/refcnt_normal.go index 9ab3348613..94beb232f0 100644 --- a/internal/cache/refcnt_normal.go +++ b/internal/cache/refcnt_normal.go @@ -37,6 +37,15 @@ func (v *refcnt) acquire() { } } +// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be +// called with a zero refcnt. This is useful for cases where the entry which +// is being reference counted is inside a container and the container does not +// hold a reference. The container uses release() returning true to attempt to +// do a cleanup from the map. +func (v *refcnt) acquireAllowZero() { + v.val.Add(1) +} + func (v *refcnt) release() bool { switch v := v.val.Add(-1); { case v < 0: @@ -48,6 +57,10 @@ func (v *refcnt) release() bool { } } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { } diff --git a/internal/cache/refcnt_tracing.go b/internal/cache/refcnt_tracing.go index 1d5e6c0219..a0329c69a7 100644 --- a/internal/cache/refcnt_tracing.go +++ b/internal/cache/refcnt_tracing.go @@ -51,6 +51,10 @@ func (v *refcnt) release() bool { return n == 0 } +func (v *refcnt) value() int32 { + return v.val.Load() +} + func (v *refcnt) trace(msg string) { s := fmt.Sprintf("%s: refs=%d\n%s", msg, v.refs(), debug.Stack()) v.Lock() diff --git a/sstable/block/buffer_pool.go b/sstable/block/buffer_pool.go index 38809b41ed..80bf550cd0 100644 --- a/sstable/block/buffer_pool.go +++ b/sstable/block/buffer_pool.go @@ -8,6 +8,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" + "github.com/cockroachdb/pebble/internal/invariants" ) // Alloc allocates a new Value for a block of length n (excluding the block @@ -52,12 +53,22 @@ func (b Value) BlockMetadata() *Metadata { // backed by a buffer pool, MakeHandle inserts the value into the block cache, // returning a handle to the now resident value. func (b Value) MakeHandle( - c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, + crh cache.ReadHandle, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, ) BufferHandle { if b.buf.Valid() { + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle was valid") + } return BufferHandle{b: b.buf} } - return BufferHandle{h: c.Set(cacheID, fileNum, offset, b.v)} + return BufferHandle{h: crh.SetReadValue(b.v)} +} + +func (b Value) SetInCacheAndReleaseForTesting( + c *cache.Cache, cacheID cache.ID, fileNum base.DiskFileNum, offset uint64, +) { + h := c.Set(cacheID, fileNum, offset, b.v) + h.Release() } // Release releases the handle. diff --git a/sstable/colblk/index_block_test.go b/sstable/colblk/index_block_test.go index c7e7d6a3dc..970535455e 100644 --- a/sstable/colblk/index_block_test.go +++ b/sstable/colblk/index_block_test.go @@ -125,7 +125,7 @@ func TestIndexIterInitHandle(t *testing.T) { d := (*IndexBlockDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(blockData) - v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release() + v.SetInCacheAndReleaseForTesting(c, cache.ID(1), base.DiskFileNum(1), 0) getBlockAndIterate := func(it *IndexIter) { h := c.Get(cache.ID(1), base.DiskFileNum(1), 0) diff --git a/sstable/colblk/keyspan_test.go b/sstable/colblk/keyspan_test.go index 9592dabcc1..a306e7f192 100644 --- a/sstable/colblk/keyspan_test.go +++ b/sstable/colblk/keyspan_test.go @@ -88,7 +88,7 @@ func TestKeyspanBlockPooling(t *testing.T) { copy(v.BlockData(), b) d := (*KeyspanDecoder)(unsafe.Pointer(v.BlockMetadata())) d.Init(v.BlockData()) - v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release() + v.SetInCacheAndReleaseForTesting(c, cache.ID(1), base.DiskFileNum(1), 0) getBlockAndIterate := func() { h := c.Get(cache.ID(1), base.DiskFileNum(1), 0) diff --git a/sstable/reader.go b/sstable/reader.go index 8955b054eb..9e4aa3430f 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -499,17 +499,48 @@ func (r *Reader) readBlockInternal( bh block.Handle, initBlockMetadataFn func(*block.Metadata, []byte) error, ) (handle block.BufferHandle, _ error) { - if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Valid() { - // Cache hit. - if readHandle != nil { - readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen)) + var ch cache.Handle + var crh cache.ReadHandle + hit := true + if env.BufferPool == nil { + var errorDuration time.Duration + var err error + ch, crh, errorDuration, hit, err = r.cacheOpts.Cache.GetWithReadHandle( + ctx, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + if errorDuration > 5*time.Millisecond && r.logger.IsTracingEnabled(ctx) { + r.logger.Eventf( + ctx, "waited for turn when %s time wasted by failed reads", errorDuration.String()) + } + // TODO(sumeer): consider tracing when waited longer than some duration + // for turn to do the read. + if err != nil { + return block.BufferHandle{}, err + } + } else { + // The compaction path uses env.BufferPool, and does not coordinate read + // using a cache.ReadHandle. This is ok since only a single compaction is + // reading a block. + ch = r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + if ch.Valid() { + hit = true } - env.BlockServedFromCache(bh.Length) - return block.CacheBufferHandle(h), nil + } + // INVARIANT: hit => ch.Valid() + if ch.Valid() { + if hit { + // Cache hit. + if readHandle != nil { + readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen)) + } + env.BlockServedFromCache(bh.Length) + } + if invariants.Enabled && crh.Valid() { + panic("cache.ReadHandle must not be valid") + } + return block.CacheBufferHandle(ch), nil } - // Cache miss. - + // Need to read. First acquire loadBlockSema, if needed. if sema := r.loadBlockSema; sema != nil { if err := sema.Acquire(ctx, 1); err != nil { // An error here can only come from the context. @@ -517,7 +548,26 @@ func (r *Reader) readBlockInternal( } defer sema.Release(1) } + value, err := r.doRead(ctx, env, readHandle, bh, initBlockMetadataFn) + if err != nil { + if crh.Valid() { + crh.SetReadError(err) + } + return block.BufferHandle{}, err + } + h := value.MakeHandle(crh, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) + return h, nil +} +// doRead is a helper for readBlockInternal that does the read, checksum +// check, decompression, and returns either a block.Value or an error. +func (r *Reader) doRead( + ctx context.Context, + env readBlockEnv, + readHandle objstorage.ReadHandle, + bh block.Handle, + initBlockMetadataFn func(*block.Metadata, []byte) error, +) (block.Value, error) { compressed := block.Alloc(int(bh.Length+block.TrailerLen), env.BufferPool) readStopwatch := makeStopwatch() var err error @@ -540,17 +590,15 @@ func (r *Reader) readBlockInternal( } if err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } env.BlockRead(bh.Length, readDuration) - if err := checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { + if err = checkChecksum(r.checksumType, compressed.BlockData(), bh, r.cacheOpts.FileNum); err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - typ := block.CompressionIndicator(compressed.BlockData()[bh.Length]) compressed.Truncate(int(bh.Length)) - var decompressed block.Value if typ == block.NoCompressionIndicator { decompressed = compressed @@ -559,23 +607,21 @@ func (r *Reader) readBlockInternal( decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.BlockData()) if err != nil { compressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - decompressed = block.Alloc(decodedLen, env.BufferPool) err = block.DecompressInto(typ, compressed.BlockData()[prefixLen:], decompressed.BlockData()) compressed.Release() if err != nil { decompressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } } - if err := initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { + if err = initBlockMetadataFn(decompressed.BlockMetadata(), decompressed.BlockData()); err != nil { decompressed.Release() - return block.BufferHandle{}, err + return block.Value{}, err } - h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) - return h, nil + return decompressed, nil } func (r *Reader) readMetaindex(