diff --git a/options.go b/options.go index 7330b3cf7c..a7542755a3 100644 --- a/options.go +++ b/options.go @@ -1954,11 +1954,8 @@ func (o *Options) MakeReaderOptions() sstable.ReaderOptions { if o != nil { readerOpts.LoadBlockSema = o.LoadBlockSema readerOpts.Comparer = o.Comparer + readerOpts.Merger = o.Merger readerOpts.Filters = o.Filters - if o.Merger != nil { - readerOpts.Merge = o.Merger.Merge - readerOpts.MergerName = o.Merger.Name - } readerOpts.LoggerAndTracer = o.LoggerAndTracer } return readerOpts diff --git a/sstable/options.go b/sstable/options.go index 1e3005ab23..37c6e19daa 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -111,8 +111,8 @@ type ReaderOptions struct { // The default value uses the same ordering as bytes.Compare. Comparer *Comparer - // Merge defines the Merge function in use for this keyspace. - Merge base.Merge + // Merger defines the Merge function in use for this keyspace. + Merger *Merger Comparers Comparers Mergers Mergers @@ -121,11 +121,6 @@ type ReaderOptions struct { // policies that are not in this map will be ignored. Filters map[string]FilterPolicy - // Merger defines the associative merge operation to use for merging values - // written with {Batch,DB}.Merge. The MergerName is checked for consistency - // with the value stored in the sstable when it was written. - MergerName string - // Logger is an optional logger and tracer. LoggerAndTracer base.LoggerAndTracer @@ -154,11 +149,8 @@ func (o ReaderOptions) ensureDefaults() ReaderOptions { if o.Comparer == nil { o.Comparer = base.DefaultComparer } - if o.Merge == nil { - o.Merge = base.DefaultMerger.Merge - } - if o.MergerName == "" { - o.MergerName = base.DefaultMerger.Name + if o.Merger == nil { + o.Merger = base.DefaultMerger } if o.LoggerAndTracer == nil { o.LoggerAndTracer = base.NoopLoggerAndTracer{} diff --git a/sstable/reader.go b/sstable/reader.go index 1f03cf68fe..53eab8af79 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -16,12 +16,14 @@ import ( "github.com/cespare/xxhash/v2" "github.com/cockroachdb/errors" + "github.com/cockroachdb/fifo" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/bytealloc" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/crc" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" + "github.com/cockroachdb/pebble/internal/sstableinternal" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing" @@ -87,8 +89,24 @@ type blockTransform func([]byte) ([]byte, error) // Reader is a table reader. type Reader struct { - readable objstorage.Readable - err error + readable objstorage.Readable + + // The following fields are copied from the ReadOptions. + cacheOpts sstableinternal.CacheOptions + loadBlockSema *fifo.Semaphore + deniedUserProperties map[string]struct{} + filterMetricsTracker *FilterMetricsTracker + logger base.LoggerAndTracer + + Compare Compare + Equal Equal + FormatKey base.FormatKey + Split Split + + tableFilter *tableFilterReader + + err error + indexBH block.Handle filterBH block.Handle rangeDelBH block.Handle @@ -97,17 +115,11 @@ type Reader struct { propertiesBH block.Handle metaIndexBH block.Handle footerBH block.Handle - opts ReaderOptions - Compare Compare - Equal Equal - FormatKey base.FormatKey - Split Split - tableFilter *tableFilterReader - // Keep types that are not multiples of 8 bytes at the end and with - // decreasing size. + Properties Properties tableFormat TableFormat checksumType block.ChecksumType + // metaBufferPool is a buffer pool used exclusively when opening a table and // loading its meta blocks. metaBufferPoolAlloc is used to batch-allocate // the BufferPool.pool slice as a part of the Reader allocation. It's @@ -122,7 +134,7 @@ var _ CommonReader = (*Reader)(nil) // Close the reader and the underlying objstorage.Readable. func (r *Reader) Close() error { - r.opts.internal.CacheOpts.Cache.Unref() + r.cacheOpts.Cache.Unref() if r.readable != nil { r.err = firstError(r.err, r.readable.Close()) @@ -412,8 +424,7 @@ func (r *Reader) readBlock( iterStats *iterStatsAccumulator, bufferPool *block.BufferPool, ) (handle block.BufferHandle, _ error) { - cacheOpts := r.opts.internal.CacheOpts - if h := cacheOpts.Cache.Get(cacheOpts.CacheID, cacheOpts.FileNum, bh.Offset); h.Get() != nil { + if h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset); h.Get() != nil { // Cache hit. if readHandle != nil { readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+block.TrailerLen)) @@ -432,7 +443,7 @@ func (r *Reader) readBlock( // Cache miss. - if sema := r.opts.LoadBlockSema; sema != nil { + if sema := r.loadBlockSema; sema != nil { if err := sema.Acquire(ctx, 1); err != nil { // An error here can only come from the context. return block.BufferHandle{}, err @@ -457,8 +468,8 @@ func (r *Reader) readBlock( } // Call IsTracingEnabled to avoid the allocations of boxing integers into an // interface{}, unless necessary. - if readDuration >= slowReadTracingThreshold && r.opts.LoggerAndTracer.IsTracingEnabled(ctx) { - r.opts.LoggerAndTracer.Eventf(ctx, "reading %d bytes took %s", + if readDuration >= slowReadTracingThreshold && r.logger.IsTracingEnabled(ctx) { + r.logger.Eventf(ctx, "reading %d bytes took %s", int(bh.Length+block.TrailerLen), readDuration.String()) } if stats != nil { @@ -469,7 +480,7 @@ func (r *Reader) readBlock( compressed.Release() return block.BufferHandle{}, err } - if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.opts.internal.CacheOpts.FileNum); err != nil { + if err := checkChecksum(r.checksumType, compressed.Get(), bh, r.cacheOpts.FileNum); err != nil { compressed.Release() return block.BufferHandle{}, err } @@ -514,11 +525,13 @@ func (r *Reader) readBlock( if iterStats != nil { iterStats.reportStats(bh.Length, 0, readDuration) } - h := decompressed.MakeHandle(cacheOpts.Cache, cacheOpts.CacheID, cacheOpts.FileNum, bh.Offset) + h := decompressed.MakeHandle(r.cacheOpts.Cache, r.cacheOpts.CacheID, r.cacheOpts.FileNum, bh.Offset) return h, nil } -func (r *Reader) readMetaindex(metaindexBH block.Handle, readHandle objstorage.ReadHandle) error { +func (r *Reader) readMetaindex( + metaindexBH block.Handle, readHandle objstorage.ReadHandle, filters map[string]FilterPolicy, +) error { // We use a BufferPool when reading metaindex blocks in order to avoid // populating the block cache with these blocks. In heavy-write workloads, // especially with high compaction concurrency, new tables may be created @@ -583,7 +596,7 @@ func (r *Reader) readMetaindex(metaindexBH block.Handle, readHandle objstorage.R return err } r.propertiesBH = bh - err := r.Properties.load(b.Get(), r.opts.DeniedUserProperties) + err := r.Properties.load(b.Get(), r.deniedUserProperties) b.Release() if err != nil { return err @@ -607,7 +620,7 @@ func (r *Reader) readMetaindex(metaindexBH block.Handle, readHandle objstorage.R r.rangeKeyBH = bh } - for name, fp := range r.opts.Filters { + for name, fp := range filters { types := []struct { ftype FilterType prefix string @@ -621,7 +634,7 @@ func (r *Reader) readMetaindex(metaindexBH block.Handle, readHandle objstorage.R switch t.ftype { case TableFilter: - r.tableFilter = newTableFilterReader(fp, r.opts.FilterMetricsTracker) + r.tableFilter = newTableFilterReader(fp, r.filterMetricsTracker) default: return base.CorruptionErrorf("unknown filter type: %v", errors.Safe(t.ftype)) } @@ -945,17 +958,20 @@ func NewReader(f objstorage.Readable, o ReaderOptions) (*Reader, error) { } o = o.ensureDefaults() r := &Reader{ - readable: f, - opts: o, - } - cacheOpts := &r.opts.internal.CacheOpts - if cacheOpts.Cache == nil { - cacheOpts.Cache = cache.New(0) + readable: f, + cacheOpts: o.internal.CacheOpts, + loadBlockSema: o.LoadBlockSema, + deniedUserProperties: o.DeniedUserProperties, + filterMetricsTracker: o.FilterMetricsTracker, + logger: o.LoggerAndTracer, + } + if r.cacheOpts.Cache == nil { + r.cacheOpts.Cache = cache.New(0) } else { - r.opts.internal.CacheOpts.Cache.Ref() + r.cacheOpts.Cache.Ref() } - if cacheOpts.CacheID == 0 { - cacheOpts.CacheID = cacheOpts.Cache.NewID() + if r.cacheOpts.CacheID == 0 { + r.cacheOpts.CacheID = r.cacheOpts.Cache.NewID() } var preallocRH objstorageprovider.PreallocatedReadHandle @@ -972,7 +988,7 @@ func NewReader(f objstorage.Readable, o ReaderOptions) (*Reader, error) { r.checksumType = footer.checksum r.tableFormat = footer.format // Read the metaindex and properties blocks. - if err := r.readMetaindex(footer.metaindexBH, rh); err != nil { + if err := r.readMetaindex(footer.metaindexBH, rh, o.Filters); err != nil { r.err = err return nil, r.Close() } @@ -992,17 +1008,17 @@ func NewReader(f objstorage.Readable, o ReaderOptions) (*Reader, error) { r.Split = comparer.Split } else { r.err = errors.Errorf("pebble/table: %d: unknown comparer %s", - errors.Safe(r.opts.internal.CacheOpts.FileNum), errors.Safe(r.Properties.ComparerName)) + errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.ComparerName)) } if mergerName := r.Properties.MergerName; mergerName != "" && mergerName != "nullptr" { - if o.MergerName == mergerName { - // r.opts.Merge is ok. - } else if m, ok := o.Mergers[mergerName]; ok { - r.opts.Merge = m.Merge + if o.Merger != nil && o.Merger.Name == mergerName { + // opts.Merger matches. + } else if _, ok := o.Mergers[mergerName]; ok { + // Known merger. } else { r.err = errors.Errorf("pebble/table: %d: unknown merger %s", - errors.Safe(r.opts.internal.CacheOpts.FileNum), errors.Safe(r.Properties.MergerName)) + errors.Safe(r.cacheOpts.FileNum), errors.Safe(r.Properties.MergerName)) } } diff --git a/sstable/reader_iter.go b/sstable/reader_iter.go index efbecfe46b..52cae69a59 100644 --- a/sstable/reader_iter.go +++ b/sstable/reader_iter.go @@ -163,7 +163,7 @@ func (i *compactionIterator) String() string { if i.vState != nil { return i.vState.fileNum.String() } - return i.reader.opts.internal.CacheOpts.FileNum.String() + return i.reader.cacheOpts.FileNum.String() } func (i *compactionIterator) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV { diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index 7aca2e55a0..22f8c066ff 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -1499,7 +1499,7 @@ func (i *singleLevelIterator) String() string { if i.vState != nil { return i.vState.fileNum.String() } - return i.reader.opts.internal.CacheOpts.FileNum.String() + return i.reader.cacheOpts.FileNum.String() } // DebugTree is part of the InternalIterator interface. diff --git a/testdata/ingest b/testdata/ingest index 59e5e39c3d..354e50e978 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -54,7 +54,7 @@ Virtual tables: 0 (0B) Local tables size: 569B Compression types: snappy: 1 Block cache: 6 entries (945B) hit rate: 30.8% -Table cache: 1 entries (784B) hit rate: 50.0% +Table cache: 1 entries (728B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 diff --git a/testdata/metrics b/testdata/metrics index 771102af63..d583567c6b 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -75,7 +75,7 @@ Virtual tables: 0 (0B) Local tables size: 589B Compression types: snappy: 1 Block cache: 3 entries (484B) hit rate: 0.0% -Table cache: 1 entries (784B) hit rate: 0.0% +Table cache: 1 entries (728B) hit rate: 0.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 1 @@ -131,7 +131,7 @@ Virtual tables: 0 (0B) Local tables size: 595B Compression types: snappy: 1 Block cache: 5 entries (946B) hit rate: 33.3% -Table cache: 2 entries (1.5KB) hit rate: 66.7% +Table cache: 2 entries (1.4KB) hit rate: 66.7% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 2 @@ -174,7 +174,7 @@ Virtual tables: 0 (0B) Local tables size: 595B Compression types: snappy: 1 Block cache: 5 entries (946B) hit rate: 33.3% -Table cache: 2 entries (1.5KB) hit rate: 66.7% +Table cache: 2 entries (1.4KB) hit rate: 66.7% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 2 @@ -214,7 +214,7 @@ Virtual tables: 0 (0B) Local tables size: 595B Compression types: snappy: 1 Block cache: 3 entries (484B) hit rate: 33.3% -Table cache: 1 entries (784B) hit rate: 66.7% +Table cache: 1 entries (728B) hit rate: 66.7% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 1 @@ -488,7 +488,7 @@ Virtual tables: 0 (0B) Local tables size: 4.3KB Compression types: snappy: 7 Block cache: 12 entries (1.9KB) hit rate: 9.1% -Table cache: 1 entries (784B) hit rate: 53.8% +Table cache: 1 entries (728B) hit rate: 53.8% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -551,7 +551,7 @@ Virtual tables: 0 (0B) Local tables size: 6.1KB Compression types: snappy: 10 Block cache: 12 entries (1.9KB) hit rate: 9.1% -Table cache: 1 entries (784B) hit rate: 53.8% +Table cache: 1 entries (728B) hit rate: 53.8% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -822,7 +822,7 @@ Virtual tables: 0 (0B) Local tables size: 0B Compression types: snappy: 1 Block cache: 1 entries (440B) hit rate: 0.0% -Table cache: 1 entries (784B) hit rate: 0.0% +Table cache: 1 entries (728B) hit rate: 0.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -869,7 +869,7 @@ Virtual tables: 0 (0B) Local tables size: 0B Compression types: snappy: 2 Block cache: 6 entries (996B) hit rate: 0.0% -Table cache: 1 entries (784B) hit rate: 50.0% +Table cache: 1 entries (728B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -917,7 +917,7 @@ Virtual tables: 0 (0B) Local tables size: 589B Compression types: snappy: 3 Block cache: 6 entries (996B) hit rate: 0.0% -Table cache: 1 entries (784B) hit rate: 50.0% +Table cache: 1 entries (728B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0