From 424d46d1147775f369f3be91ff4417110476ff5d Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Sun, 10 Nov 2024 06:55:54 -0800 Subject: [PATCH] table cache: use a long-lived ReaderProvider An `sstable.ReaderProvider` is necessary whenever an sstable has value blocks. Currently, we allocate one every time we create a point iterator. This commit makes the `tableCacheShardReaderProvider` usable in parallel and maintains a long-lived instance in the `tableCacheValue`. --- table_cache.go | 105 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 70 insertions(+), 35 deletions(-) diff --git a/table_cache.go b/table_cache.go index d9cfd8a3fc..53ceef7b14 100644 --- a/table_cache.go +++ b/table_cache.go @@ -557,10 +557,6 @@ func (c *tableCacheShard) newPointIter( if err != nil { return nil, err } - var rp sstable.ReaderProvider - if tableFormat >= sstable.TableFormatPebblev3 && v.reader.Properties.NumValueBlocks > 0 { - rp = &tableCacheShardReaderProvider{c: c, file: file, dbOpts: dbOpts} - } if v.isShared && file.SyntheticSeqNum() != 0 { if tableFormat < sstable.TableFormatPebblev4 { @@ -577,11 +573,11 @@ func (c *tableCacheShard) newPointIter( uint64(uintptr(unsafe.Pointer(v.reader))), opts.Category) } if internalOpts.compaction { - iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, rp, internalOpts.bufferPool) + iter, err = cr.NewCompactionIter(transforms, iterStatsAccum, &v.readerProvider, internalOpts.bufferPool) } else { iter, err = cr.NewPointIter( ctx, transforms, opts.GetLowerBound(), opts.GetUpperBound(), filterer, filterBlockSizeLimit, - internalOpts.stats, iterStatsAccum, rp) + internalOpts.stats, iterStatsAccum, &v.readerProvider) } if err != nil { return nil, err @@ -652,15 +648,36 @@ func (c *tableCacheShard) newRangeKeyIter( return cr.NewRawRangeKeyIter(ctx, transforms) } +// tableCacheShardReaderProvider implements sstable.ReaderProvider for a +// specific table. type tableCacheShardReaderProvider struct { - c *tableCacheShard - file *manifest.FileMetadata - dbOpts *tableCacheOpts - v *tableCacheValue + c *tableCacheShard + dbOpts *tableCacheOpts + backingFileNum base.DiskFileNum + + mu struct { + sync.Mutex + // v is the result of findNode. Whenever it is not null, we hold a refcount + // on the tableCacheValue. + v *tableCacheValue + // refCount is the number of GetReader() calls that have not received a + // corresponding Close(). + refCount int + } } var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{} +func (rp *tableCacheShardReaderProvider) init( + c *tableCacheShard, dbOpts *tableCacheOpts, backingFileNum base.DiskFileNum, +) { + rp.c = c + rp.dbOpts = dbOpts + rp.backingFileNum = backingFileNum + rp.mu.v = nil + rp.mu.refCount = 0 +} + // GetReader implements sstable.ReaderProvider. Note that it is not the // responsibility of tableCacheShardReaderProvider to ensure that the file // continues to exist. The ReaderProvider is used in iterators where the @@ -676,21 +693,38 @@ var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{} // TODO(bananabrick): We could return a wrapper over the Reader to ensure // that the reader isn't used for other purposes. func (rp *tableCacheShardReaderProvider) GetReader(ctx context.Context) (*sstable.Reader, error) { + rp.mu.Lock() + defer rp.mu.Unlock() + + if rp.mu.v != nil { + rp.mu.refCount++ + return rp.mu.v.reader, nil + } + // Calling findNode gives us the responsibility of decrementing v's // refCount. - v := rp.c.findNode(ctx, rp.file.FileBacking, rp.dbOpts) + v := rp.c.findNodeInternal(ctx, rp.backingFileNum, rp.dbOpts) if v.err != nil { defer rp.c.unrefValue(v) return nil, v.err } - rp.v = v + rp.mu.v = v + rp.mu.refCount = 1 return v.reader, nil } // Close implements sstable.ReaderProvider. func (rp *tableCacheShardReaderProvider) Close() { - rp.c.unrefValue(rp.v) - rp.v = nil + rp.mu.Lock() + defer rp.mu.Unlock() + rp.mu.refCount-- + if rp.mu.refCount <= 0 { + if rp.mu.refCount < 0 { + panic("pebble: sstable.ReaderProvider misuse") + } + rp.c.unrefValue(rp.mu.v) + rp.mu.v = nil + } } // getTableProperties return sst table properties for target file @@ -782,20 +816,18 @@ func (c *tableCacheShard) findNode( // Caution! Here fileMetadata can be a physical or virtual table. Table cache // readers are associated with the physical backings. All virtual tables with // the same backing will use the same reader from the cache; so no information - // that can differ among these virtual tables can be plumbed into loadInfo. - info := loadInfo{ - backingFileNum: b.DiskFileNum, - } + // that can differ among these virtual tables can be passed to findNodeInternal. + backingFileNum := b.DiskFileNum - return c.findNodeInternal(ctx, info, dbOpts) + return c.findNodeInternal(ctx, backingFileNum, dbOpts) } func (c *tableCacheShard) findNodeInternal( - ctx context.Context, loadInfo loadInfo, dbOpts *tableCacheOpts, + ctx context.Context, backingFileNum base.DiskFileNum, dbOpts *tableCacheOpts, ) *tableCacheValue { // Fast-path for a hit in the cache. c.mu.RLock() - key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum} + key := tableCacheKey{dbOpts.cacheID, backingFileNum} if n := c.mu.nodes[key]; n != nil && n.value != nil { // Fast-path hit. // @@ -817,7 +849,7 @@ func (c *tableCacheShard) findNodeInternal( case n == nil: // Slow-path miss of a non-existent node. n = &tableCacheNode{ - fileNum: loadInfo.backingFileNum, + fileNum: backingFileNum, ptype: tableCacheNodeCold, } c.addNode(n, dbOpts) @@ -854,6 +886,7 @@ func (c *tableCacheShard) findNodeInternal( v := &tableCacheValue{ loaded: make(chan struct{}), } + v.readerProvider.init(c, dbOpts, backingFileNum) v.refCount.Store(2) // Cache the closure invoked when an iterator is closed. This avoids an // allocation on every call to newIters. @@ -875,7 +908,7 @@ func (c *tableCacheShard) findNodeInternal( // Note adding to the cache lists must complete before we begin loading the // table as a failure during load will result in the node being unlinked. pprof.Do(context.Background(), tableCacheLabels, func(context.Context) { - v.load(ctx, loadInfo, c, dbOpts) + v.load(ctx, backingFileNum, c, dbOpts) }) return v } @@ -1094,51 +1127,53 @@ type tableCacheValue struct { closeHook func(i sstable.Iterator) error reader *sstable.Reader err error - isShared bool loaded chan struct{} // Reference count for the value. The reader is closed when the reference // count drops to zero. refCount atomic.Int32 -} - -// loadInfo contains the information needed to populate a new cache entry. -type loadInfo struct { - backingFileNum base.DiskFileNum + isShared bool + + // readerProvider is embedded here so that we only allocate it once as long as + // the table stays in the cache. Its state is not always logically tied to + // this specific tableCacheValue - if a table goes out of the cache and then + // comes back in, the readerProvider in a now-defunct tableCacheValue can + // still be used and will internally refer to the new tableCacheValue. + readerProvider tableCacheShardReaderProvider } func (v *tableCacheValue) load( - ctx context.Context, loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts, + ctx context.Context, backingFileNum base.DiskFileNum, c *tableCacheShard, dbOpts *tableCacheOpts, ) { // Try opening the file first. var f objstorage.Readable var err error f, err = dbOpts.objProvider.OpenForReading( - ctx, fileTypeTable, loadInfo.backingFileNum, objstorage.OpenOptions{MustExist: true}, + ctx, fileTypeTable, backingFileNum, objstorage.OpenOptions{MustExist: true}, ) if err == nil { o := dbOpts.readerOpts o.SetInternalCacheOpts(sstableinternal.CacheOptions{ Cache: dbOpts.cache, CacheID: dbOpts.cacheID, - FileNum: loadInfo.backingFileNum, + FileNum: backingFileNum, }) v.reader, err = sstable.NewReader(ctx, f, o) } if err == nil { var objMeta objstorage.ObjectMetadata - objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, loadInfo.backingFileNum) + objMeta, err = dbOpts.objProvider.Lookup(fileTypeTable, backingFileNum) v.isShared = objMeta.IsShared() } if err != nil { v.err = errors.Wrapf( - err, "pebble: backing file %s error", loadInfo.backingFileNum) + err, "pebble: backing file %s error", backingFileNum) } if v.err != nil { c.mu.Lock() defer c.mu.Unlock() // Lookup the node in the cache again as it might have already been // removed. - key := tableCacheKey{dbOpts.cacheID, loadInfo.backingFileNum} + key := tableCacheKey{dbOpts.cacheID, backingFileNum} n := c.mu.nodes[key] if n != nil && n.value == v { c.releaseNode(n)