Skip to content

Commit

Permalink
sstable: don't store ReaderOptions in Reader
Browse files Browse the repository at this point in the history
We keep Readers in the table cache so we don't want their size to be
unnecessarily large. We reduce the size by not storing the full
`ReaderOptions` inside the `Reader`; many of the fields are only used
inside `NewReader`.
  • Loading branch information
RaduBerinde committed Jul 9, 2024
1 parent 963a635 commit 26fe5a0
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 66 deletions.
5 changes: 1 addition & 4 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -1954,11 +1954,8 @@ func (o *Options) MakeReaderOptions() sstable.ReaderOptions {
if o != nil {
readerOpts.LoadBlockSema = o.LoadBlockSema
readerOpts.Comparer = o.Comparer
readerOpts.Merger = o.Merger
readerOpts.Filters = o.Filters
if o.Merger != nil {
readerOpts.Merge = o.Merger.Merge
readerOpts.MergerName = o.Merger.Name
}
readerOpts.LoggerAndTracer = o.LoggerAndTracer
}
return readerOpts
Expand Down
16 changes: 4 additions & 12 deletions sstable/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ type ReaderOptions struct {
// The default value uses the same ordering as bytes.Compare.
Comparer *Comparer

// Merge defines the Merge function in use for this keyspace.
Merge base.Merge
// Merger defines the Merge function in use for this keyspace.
Merger *Merger

Comparers Comparers
Mergers Mergers
Expand All @@ -121,11 +121,6 @@ type ReaderOptions struct {
// policies that are not in this map will be ignored.
Filters map[string]FilterPolicy

// Merger defines the associative merge operation to use for merging values
// written with {Batch,DB}.Merge. The MergerName is checked for consistency
// with the value stored in the sstable when it was written.
MergerName string

// Logger is an optional logger and tracer.
LoggerAndTracer base.LoggerAndTracer

Expand Down Expand Up @@ -154,11 +149,8 @@ func (o ReaderOptions) ensureDefaults() ReaderOptions {
if o.Comparer == nil {
o.Comparer = base.DefaultComparer
}
if o.Merge == nil {
o.Merge = base.DefaultMerger.Merge
}
if o.MergerName == "" {
o.MergerName = base.DefaultMerger.Name
if o.Merger == nil {
o.Merger = base.DefaultMerger
}
if o.LoggerAndTracer == nil {
o.LoggerAndTracer = base.NoopLoggerAndTracer{}
Expand Down
92 changes: 54 additions & 38 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (

"github.com/cespare/xxhash/v2"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/fifo"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/bytealloc"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/crc"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/sstableinternal"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
Expand Down Expand Up @@ -87,8 +89,24 @@ type blockTransform func([]byte) ([]byte, error)

// Reader is a table reader.
type Reader struct {
readable objstorage.Readable
err error
readable objstorage.Readable

// The following fields are copied from the ReadOptions.
cacheOpts sstableinternal.CacheOptions
loadBlockSema *fifo.Semaphore
deniedUserProperties map[string]struct{}
filterMetricsTracker *FilterMetricsTracker
logger base.LoggerAndTracer

Compare Compare
Equal Equal
FormatKey base.FormatKey
Split Split

tableFilter *tableFilterReader

err error

indexBH block.Handle
filterBH block.Handle
rangeDelBH block.Handle
Expand All @@ -97,17 +115,11 @@ type Reader struct {
propertiesBH block.Handle
metaIndexBH block.Handle
footerBH block.Handle
opts ReaderOptions
Compare Compare
Equal Equal
FormatKey base.FormatKey
Split Split
tableFilter *tableFilterReader
// Keep types that are not multiples of 8 bytes at the end and with
// decreasing size.

Properties Properties
tableFormat TableFormat
checksumType block.ChecksumType

// metaBufferPool is a buffer pool used exclusively when opening a table and
// loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate
// the BufferPool.pool slice as a part of the Reader allocation. It's
Expand All @@ -122,7 +134,7 @@ var _ CommonReader = (*Reader)(nil)

// Close the reader and the underlying objstorage.Readable.
func (r *Reader) Close() error {
r.opts.internal.CacheOpts.Cache.Unref()
r.cacheOpts.Cache.Unref()

if r.readable != nil {
r.err = firstError(r.err, r.readable.Close())
Expand Down Expand Up @@ -412,8 +424,7 @@ func (r *Reader) readBlock(
iterStats *iterStatsAccumulator,
bufferPool *block.BufferPool,
) (handle block.BufferHandle, _ error) {
cacheOpts := r.opts.internal.CacheOpts
if h := cacheOpts.Cache.Get(cacheOpts.CacheID, cacheOpts.FileNum, bh.Offset); h.Get() != nil {
if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Get() != nil {
// Cache hit.
if readHandle != nil {
readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen))
Expand All @@ -432,7 +443,7 @@ func (r *Reader) readBlock(

// Cache miss.

if sema := r.opts.LoadBlockSema; sema != nil {
if sema := r.loadBlockSema; sema != nil {
if err := sema.Acquire(ctx, 1); err != nil {
// An error here can only come from the context.
return block.BufferHandle{}, err
Expand All @@ -457,8 +468,8 @@ func (r *Reader) readBlock(
}
// Call IsTracingEnabled to avoid the allocations of boxing integers into an
// interface{}, unless necessary.
if readDuration >= slowReadTracingThreshold && r.opts.LoggerAndTracer.IsTracingEnabled(ctx) {
r.opts.LoggerAndTracer.Eventf(ctx, "reading %d bytes took %s",
if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) {
r.logger.Eventf(ctx, "reading %d bytes took %s",
int(bh.Length+block.TrailerLen), readDuration.String())
}
if stats != nil {
Expand All @@ -469,7 +480,7 @@ func (r *Reader) readBlock(
compressed.Release()
return block.BufferHandle{}, err
}
if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.opts.internal.CacheOpts.FileNum); err != nil {
if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.cacheOpts.FileNum); err != nil {
compressed.Release()
return block.BufferHandle{}, err
}
Expand Down Expand Up @@ -514,11 +525,13 @@ func (r *Reader) readBlock(
if iterStats != nil {
iterStats.reportStats(bh.Length, 0, readDuration)
}
h := decompressed.MakeHandle(cacheOpts.Cache, cacheOpts.CacheID, cacheOpts.FileNum, bh.Offset)
h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset)
return h, nil
}

func (r *Reader) readMetaindex(metaindexBH block.Handle, readHandle objstorage.ReadHandle) error {
func (r *Reader) readMetaindex(
metaindexBH block.Handle, readHandle objstorage.ReadHandle, filters map[string]FilterPolicy,
) error {
// We use a BufferPool when reading metaindex blocks in order to avoid
// populating the block cache with these blocks. In heavy-write workloads,
// especially with high compaction concurrency, new tables may be created
Expand Down Expand Up @@ -583,7 +596,7 @@ func (r *Reader) readMetaindex(metaindexBH block.Handle, readHandle objstorage.R
return err
}
r.propertiesBH = bh
err := r.Properties.load(b.Get(), r.opts.DeniedUserProperties)
err := r.Properties.load(b.Get(), r.deniedUserProperties)
b.Release()
if err != nil {
return err
Expand All @@ -607,7 +620,7 @@ func (r *Reader) readMetaindex(metaindexBH block.Handle, readHandle objstorage.R
r.rangeKeyBH = bh
}

for name, fp := range r.opts.Filters {
for name, fp := range filters {
types := []struct {
ftype FilterType
prefix string
Expand All @@ -621,7 +634,7 @@ func (r *Reader) readMetaindex(metaindexBH block.Handle, readHandle objstorage.R

switch t.ftype {
case TableFilter:
r.tableFilter = newTableFilterReader(fp, r.opts.FilterMetricsTracker)
r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker)
default:
return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype))
}
Expand Down Expand Up @@ -945,17 +958,20 @@ func NewReader(f objstorage.Readable, o ReaderOptions) (*Reader, error) {
}
o = o.ensureDefaults()
r := &Reader{
readable: f,
opts: o,
}
cacheOpts := &r.opts.internal.CacheOpts
if cacheOpts.Cache == nil {
cacheOpts.Cache = cache.New(0)
readable: f,
cacheOpts: o.internal.CacheOpts,
loadBlockSema: o.LoadBlockSema,
deniedUserProperties: o.DeniedUserProperties,
filterMetricsTracker: o.FilterMetricsTracker,
logger: o.LoggerAndTracer,
}
if r.cacheOpts.Cache == nil {
r.cacheOpts.Cache = cache.New(0)
} else {
r.opts.internal.CacheOpts.Cache.Ref()
r.cacheOpts.Cache.Ref()
}
if cacheOpts.CacheID == 0 {
cacheOpts.CacheID = cacheOpts.Cache.NewID()
if r.cacheOpts.CacheID == 0 {
r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID()
}

var preallocRH objstorageprovider.PreallocatedReadHandle
Expand All @@ -972,7 +988,7 @@ func NewReader(f objstorage.Readable, o ReaderOptions) (*Reader, error) {
r.checksumType = footer.checksum
r.tableFormat = footer.format
// Read the metaindex and properties blocks.
if err := r.readMetaindex(footer.metaindexBH, rh); err != nil {
if err := r.readMetaindex(footer.metaindexBH, rh, o.Filters); err != nil {
r.err = err
return nil, r.Close()
}
Expand All @@ -992,17 +1008,17 @@ func NewReader(f objstorage.Readable, o ReaderOptions) (*Reader, error) {
r.Split = comparer.Split
} else {
r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
errors.Safe(r.opts.internal.CacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName))
}

if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" {
if o.MergerName == mergerName {
// r.opts.Merge is ok.
} else if m, ok := o.Mergers[mergerName]; ok {
r.opts.Merge = m.Merge
if o.Merger != nil && o.Merger.Name == mergerName {
// opts.Merger matches.
} else if _, ok := o.Mergers[mergerName]; ok {
// Known merger.
} else {
r.err = errors.Errorf("pebble/table: %d: unknown merger %s",
errors.Safe(r.opts.internal.CacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName))
}
}

Expand Down
2 changes: 1 addition & 1 deletion sstable/reader_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (i *compactionIterator) String() string {
if i.vState != nil {
return i.vState.fileNum.String()
}
return i.reader.opts.internal.CacheOpts.FileNum.String()
return i.reader.cacheOpts.FileNum.String()
}

func (i *compactionIterator) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
Expand Down
2 changes: 1 addition & 1 deletion sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ func (i *singleLevelIterator) String() string {
if i.vState != nil {
return i.vState.fileNum.String()
}
return i.reader.opts.internal.CacheOpts.FileNum.String()
return i.reader.cacheOpts.FileNum.String()
}

// DebugTree is part of the InternalIterator interface.
Expand Down
2 changes: 1 addition & 1 deletion testdata/ingest
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Virtual tables: 0 (0B)
Local tables size: 569B
Compression types: snappy: 1
Block cache: 6 entries (945B) hit rate: 30.8%
Table cache: 1 entries (784B) hit rate: 50.0%
Table cache: 1 entries (728B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down
18 changes: 9 additions & 9 deletions testdata/metrics
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Virtual tables: 0 (0B)
Local tables size: 589B
Compression types: snappy: 1
Block cache: 3 entries (484B) hit rate: 0.0%
Table cache: 1 entries (784B) hit rate: 0.0%
Table cache: 1 entries (728B) hit rate: 0.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 1
Expand Down Expand Up @@ -131,7 +131,7 @@ Virtual tables: 0 (0B)
Local tables size: 595B
Compression types: snappy: 1
Block cache: 5 entries (946B) hit rate: 33.3%
Table cache: 2 entries (1.5KB) hit rate: 66.7%
Table cache: 2 entries (1.4KB) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 2
Expand Down Expand Up @@ -174,7 +174,7 @@ Virtual tables: 0 (0B)
Local tables size: 595B
Compression types: snappy: 1
Block cache: 5 entries (946B) hit rate: 33.3%
Table cache: 2 entries (1.5KB) hit rate: 66.7%
Table cache: 2 entries (1.4KB) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 2
Expand Down Expand Up @@ -214,7 +214,7 @@ Virtual tables: 0 (0B)
Local tables size: 595B
Compression types: snappy: 1
Block cache: 3 entries (484B) hit rate: 33.3%
Table cache: 1 entries (784B) hit rate: 66.7%
Table cache: 1 entries (728B) hit rate: 66.7%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 1
Expand Down Expand Up @@ -488,7 +488,7 @@ Virtual tables: 0 (0B)
Local tables size: 4.3KB
Compression types: snappy: 7
Block cache: 12 entries (1.9KB) hit rate: 9.1%
Table cache: 1 entries (784B) hit rate: 53.8%
Table cache: 1 entries (728B) hit rate: 53.8%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -551,7 +551,7 @@ Virtual tables: 0 (0B)
Local tables size: 6.1KB
Compression types: snappy: 10
Block cache: 12 entries (1.9KB) hit rate: 9.1%
Table cache: 1 entries (784B) hit rate: 53.8%
Table cache: 1 entries (728B) hit rate: 53.8%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -822,7 +822,7 @@ Virtual tables: 0 (0B)
Local tables size: 0B
Compression types: snappy: 1
Block cache: 1 entries (440B) hit rate: 0.0%
Table cache: 1 entries (784B) hit rate: 0.0%
Table cache: 1 entries (728B) hit rate: 0.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -869,7 +869,7 @@ Virtual tables: 0 (0B)
Local tables size: 0B
Compression types: snappy: 2
Block cache: 6 entries (996B) hit rate: 0.0%
Table cache: 1 entries (784B) hit rate: 50.0%
Table cache: 1 entries (728B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down Expand Up @@ -917,7 +917,7 @@ Virtual tables: 0 (0B)
Local tables size: 589B
Compression types: snappy: 3
Block cache: 6 entries (996B) hit rate: 0.0%
Table cache: 1 entries (784B) hit rate: 50.0%
Table cache: 1 entries (728B) hit rate: 50.0%
Secondary cache: 0 entries (0B) hit rate: 0.0%
Snapshots: 0 earliest seq num: 0
Table iters: 0
Expand Down

0 comments on commit 26fe5a0

Please sign in to comment.