Skip to content

Commit

Permalink
table cache: use a long-lived ReaderProvider
Browse files Browse the repository at this point in the history
An `sstable.ReaderProvider` is necessary whenever an sstable has value
blocks. Currently, we allocate one every time we create a point
iterator.

This commit makes the `tableCacheShardReaderProvider` usable in
parallel and maintains a long-lived instance in the `tableCacheValue`.
  • Loading branch information
RaduBerinde committed Nov 10, 2024
1 parent 461c46d commit 424d46d
Showing 1 changed file with 70 additions and 35 deletions.
105 changes: 70 additions & 35 deletions table_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,6 @@ func (c *tableCacheShard) newPointIter(
if err != nil {
return nil, err
}
var rp sstable.ReaderProvider
if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 {
rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts}
}

if v.isShared && file.SyntheticSeqNum() != 0 {
if tableFormat < sstable.TableFormatPebblev4 {
Expand All @@ -577,11 +573,11 @@ func (c *tableCacheShard) newPointIter(
uint64(uintptr(unsafe.Pointer(v.reader))), opts.Category)
}
if internalOpts.compaction {
iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, rp, internalOpts.bufferPool)
iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, &v.readerProvider, internalOpts.bufferPool)
} else {
iter, err = cr.NewPointIter(
ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, filterBlockSizeLimit,
internalOpts.stats, iterStatsAccum, rp)
internalOpts.stats, iterStatsAccum, &v.readerProvider)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -652,15 +648,36 @@ func (c *tableCacheShard) newRangeKeyIter(
return cr.NewRawRangeKeyIter(ctx, transforms)
}

// tableCacheShardReaderProvider implements sstable.ReaderProvider for a
// specific table.
type tableCacheShardReaderProvider struct {
c *tableCacheShard
file *manifest.FileMetadata
dbOpts *tableCacheOpts
v *tableCacheValue
c *tableCacheShard
dbOpts *tableCacheOpts
backingFileNum base.DiskFileNum

mu struct {
sync.Mutex
// v is the result of findNode. Whenever it is not null, we hold a refcount
// on the tableCacheValue.
v *tableCacheValue
// refCount is the number of GetReader() calls that have not received a
// corresponding Close().
refCount int
}
}

var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}

func (rp *tableCacheShardReaderProvider) init(
c *tableCacheShard, dbOpts *tableCacheOpts, backingFileNum base.DiskFileNum,
) {
rp.c = c
rp.dbOpts = dbOpts
rp.backingFileNum = backingFileNum
rp.mu.v = nil
rp.mu.refCount = 0
}

// GetReader implements sstable.ReaderProvider. Note that it is not the
// responsibility of tableCacheShardReaderProvider to ensure that the file
// continues to exist. The ReaderProvider is used in iterators where the
Expand All @@ -676,21 +693,38 @@ var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{}
// TODO(bananabrick): We could return a wrapper over the Reader to ensure
// that the reader isn't used for other purposes.
func (rp *tableCacheShardReaderProvider) GetReader(ctx context.Context) (*sstable.Reader, error) {
rp.mu.Lock()
defer rp.mu.Unlock()

if rp.mu.v != nil {
rp.mu.refCount++
return rp.mu.v.reader, nil
}

// Calling findNode gives us the responsibility of decrementing v's
// refCount.
v := rp.c.findNode(ctx, rp.file.FileBacking, rp.dbOpts)
v := rp.c.findNodeInternal(ctx, rp.backingFileNum, rp.dbOpts)
if v.err != nil {
defer rp.c.unrefValue(v)
return nil, v.err
}
rp.v = v
rp.mu.v = v
rp.mu.refCount = 1
return v.reader, nil
}

// Close implements sstable.ReaderProvider.
func (rp *tableCacheShardReaderProvider) Close() {
rp.c.unrefValue(rp.v)
rp.v = nil
rp.mu.Lock()
defer rp.mu.Unlock()
rp.mu.refCount--
if rp.mu.refCount <= 0 {
if rp.mu.refCount < 0 {
panic("pebble: sstable.ReaderProvider misuse")
}
rp.c.unrefValue(rp.mu.v)
rp.mu.v = nil
}
}

// getTableProperties return sst table properties for target file
Expand Down Expand Up @@ -782,20 +816,18 @@ func (c *tableCacheShard) findNode(
// Caution! Here fileMetadata can be a physical or virtual table. Table cache
// readers are associated with the physical backings. All virtual tables with
// the same backing will use the same reader from the cache; so no information
// that can differ among these virtual tables can be plumbed into loadInfo.
info := loadInfo{
backingFileNum: b.DiskFileNum,
}
// that can differ among these virtual tables can be passed to findNodeInternal.
backingFileNum := b.DiskFileNum

return c.findNodeInternal(ctx, info, dbOpts)
return c.findNodeInternal(ctx, backingFileNum, dbOpts)
}

func (c *tableCacheShard) findNodeInternal(
ctx context.Context, loadInfo loadInfo, dbOpts *tableCacheOpts,
ctx context.Context, backingFileNum base.DiskFileNum, dbOpts *tableCacheOpts,
) *tableCacheValue {
// Fast-path for a hit in the cache.
c.mu.RLock()
key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
key := tableCacheKey{dbOpts.cacheID, backingFileNum}
if n := c.mu.nodes[key]; n != nil && n.value != nil {
// Fast-path hit.
//
Expand All @@ -817,7 +849,7 @@ func (c *tableCacheShard) findNodeInternal(
case n == nil:
// Slow-path miss of a non-existent node.
n = &tableCacheNode{
fileNum: loadInfo.backingFileNum,
fileNum: backingFileNum,
ptype: tableCacheNodeCold,
}
c.addNode(n, dbOpts)
Expand Down Expand Up @@ -854,6 +886,7 @@ func (c *tableCacheShard) findNodeInternal(
v := &tableCacheValue{
loaded: make(chan struct{}),
}
v.readerProvider.init(c, dbOpts, backingFileNum)
v.refCount.Store(2)
// Cache the closure invoked when an iterator is closed. This avoids an
// allocation on every call to newIters.
Expand All @@ -875,7 +908,7 @@ func (c *tableCacheShard) findNodeInternal(
// Note adding to the cache lists must complete before we begin loading the
// table as a failure during load will result in the node being unlinked.
pprof.Do(context.Background(), tableCacheLabels, func(context.Context) {
v.load(ctx, loadInfo, c, dbOpts)
v.load(ctx, backingFileNum, c, dbOpts)
})
return v
}
Expand Down Expand Up @@ -1094,51 +1127,53 @@ type tableCacheValue struct {
closeHook func(i sstable.Iterator) error
reader *sstable.Reader
err error
isShared bool
loaded chan struct{}
// Reference count for the value. The reader is closed when the reference
// count drops to zero.
refCount atomic.Int32
}

// loadInfo contains the information needed to populate a new cache entry.
type loadInfo struct {
backingFileNum base.DiskFileNum
isShared bool

// readerProvider is embedded here so that we only allocate it once as long as
// the table stays in the cache. Its state is not always logically tied to
// this specific tableCacheValue - if a table goes out of the cache and then
// comes back in, the readerProvider in a now-defunct tableCacheValue can
// still be used and will internally refer to the new tableCacheValue.
readerProvider tableCacheShardReaderProvider
}

func (v *tableCacheValue) load(
ctx context.Context, loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts,
ctx context.Context, backingFileNum base.DiskFileNum, c *tableCacheShard, dbOpts *tableCacheOpts,
) {
// Try opening the file first.
var f objstorage.Readable
var err error
f, err = dbOpts.objProvider.OpenForReading(
ctx, fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true},
ctx, fileTypeTable, backingFileNum, objstorage.OpenOptions{MustExist: true},
)
if err == nil {
o := dbOpts.readerOpts
o.SetInternalCacheOpts(sstableinternal.CacheOptions{
Cache: dbOpts.cache,
CacheID: dbOpts.cacheID,
FileNum: loadInfo.backingFileNum,
FileNum: backingFileNum,
})
v.reader, err = sstable.NewReader(ctx, f, o)
}
if err == nil {
var objMeta objstorage.ObjectMetadata
objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, loadInfo.backingFileNum)
objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, backingFileNum)
v.isShared = objMeta.IsShared()
}
if err != nil {
v.err = errors.Wrapf(
err, "pebble: backing file %s error", loadInfo.backingFileNum)
err, "pebble: backing file %s error", backingFileNum)
}
if v.err != nil {
c.mu.Lock()
defer c.mu.Unlock()
// Lookup the node in the cache again as it might have already been
// removed.
key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum}
key := tableCacheKey{dbOpts.cacheID, backingFileNum}
n := c.mu.nodes[key]
if n != nil && n.value == v {
c.releaseNode(n)
Expand Down

0 comments on commit 424d46d

Please sign in to comment.