diff --git a/sstable/block/kv.go b/sstable/block/kv.go index d27dd80dd9..21a3f3251c 100644 --- a/sstable/block/kv.go +++ b/sstable/block/kv.go @@ -72,5 +72,12 @@ func InPlaceValuePrefix(setHasSameKeyPrefix bool) ValuePrefix { // GetLazyValueForPrefixAndValueHandler is an interface for getting a LazyValue // from a value prefix and value. type GetLazyValueForPrefixAndValueHandler interface { + // GetLazyValueForPrefixAndValueHandle returns a LazyValue for the given value + // prefix and value. + // + // The result is only valid until the next call to + // GetLazyValueForPrefixAndValueHandle. Use LazyValue.Clone if the lifetime of + // the LazyValue needs to be extended. For more details, see the "memory + // management" comment where LazyValue is declared. GetLazyValueForPrefixAndValueHandle(handle []byte) base.LazyValue } diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index 5dcbbd744a..2321d91d3b 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -64,7 +64,7 @@ type singleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIte // loading. It may not actually have loaded the block, due to an error or // because it was considered irrelevant. dataBH block.Handle - vbReader *valueBlockReader + vbReader valueBlockReader // vbRH is the read handle for value blocks, which are in a different // part of the sstable than data blocks. vbRH objstorage.ReadHandle @@ -228,22 +228,13 @@ func newColumnBlockSingleLevelIterator( ) var getLazyValuer block.GetLazyValueForPrefixAndValueHandler if r.Properties.NumValueBlocks > 0 { - // NB: we cannot avoid this ~248 byte allocation, since valueBlockReader - // can outlive the singleLevelIterator due to be being embedded in a - // LazyValue. This consumes ~2% in microbenchmark CPU profiles, but we - // should only optimize this if it shows up as significant in end-to-end - // CockroachDB benchmarks, since it is tricky to do so. One possibility - // is that if many sstable iterators only get positioned at latest - // versions of keys, and therefore never expose a LazyValue that is - // separated to their callers, they can put this valueBlockReader into a - // sync.Pool. - i.vbReader = &valueBlockReader{ + i.vbReader = valueBlockReader{ bpOpen: i, rp: rp, vbih: r.valueBIH, stats: stats, } - getLazyValuer = i.vbReader + getLazyValuer = &i.vbReader i.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.vbRHPrealloc) } i.data.InitOnce(r.keySchema, i.cmp, r.Split, getLazyValuer) @@ -301,13 +292,13 @@ func newRowBlockSingleLevelIterator( // versions of keys, and therefore never expose a LazyValue that is // separated to their callers, they can put this valueBlockReader into a // sync.Pool. - i.vbReader = &valueBlockReader{ + i.vbReader = valueBlockReader{ bpOpen: i, rp: rp, vbih: r.valueBIH, stats: stats, } - (&i.data).SetGetLazyValuer(i.vbReader) + (&i.data).SetGetLazyValuer(&i.vbReader) i.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.vbRHPrealloc) } i.data.SetHasValuePrefix(true) @@ -1600,9 +1591,7 @@ func (i *singleLevelIterator[I, PI, D, PD]) closeInternal() error { if i.bpfs != nil { releaseBlockPropertiesFilterer(i.bpfs) } - if i.vbReader != nil { - i.vbReader.close() - } + i.vbReader.close() if i.vbRH != nil { err = firstError(err, i.vbRH.Close()) i.vbRH = nil diff --git a/sstable/reader_iter_two_lvl.go b/sstable/reader_iter_two_lvl.go index 31e38f4367..0d75b20224 100644 --- a/sstable/reader_iter_two_lvl.go +++ b/sstable/reader_iter_two_lvl.go @@ -186,13 +186,13 @@ func newColumnBlockTwoLevelIterator( // versions of keys, and therefore never expose a LazyValue that is // separated to their callers, they can put this valueBlockReader into a // sync.Pool. - i.secondLevel.vbReader = &valueBlockReader{ + i.secondLevel.vbReader = valueBlockReader{ bpOpen: &i.secondLevel, rp: rp, vbih: r.valueBIH, stats: stats, } - getLazyValuer = i.secondLevel.vbReader + getLazyValuer = &i.secondLevel.vbReader i.secondLevel.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.secondLevel.vbRHPrealloc) } i.secondLevel.data.InitOnce(r.keySchema, r.Compare, r.Split, getLazyValuer) @@ -249,13 +249,13 @@ func newRowBlockTwoLevelIterator( // versions of keys, and therefore never expose a LazyValue that is // separated to their callers, they can put this valueBlockReader into a // sync.Pool. - i.secondLevel.vbReader = &valueBlockReader{ + i.secondLevel.vbReader = valueBlockReader{ bpOpen: &i.secondLevel, rp: rp, vbih: r.valueBIH, stats: stats, } - i.secondLevel.data.SetGetLazyValuer(i.secondLevel.vbReader) + i.secondLevel.data.SetGetLazyValuer(&i.secondLevel.vbReader) i.secondLevel.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.secondLevel.vbRHPrealloc) } i.secondLevel.data.SetHasValuePrefix(true) diff --git a/sstable/value_block.go b/sstable/value_block.go index 09b76fba85..27aa2ba028 100644 --- a/sstable/value_block.go +++ b/sstable/value_block.go @@ -721,38 +721,42 @@ func (trp *trivialReaderProvider) GetReader(ctx context.Context) (*Reader, error // Close implements ReaderProvider. func (trp *trivialReaderProvider) Close() {} -// valueBlockReader is used to retrieve values in value -// blocks. It is used when the sstable was written with -// Properties.ValueBlocksAreEnabled. +// valueBlockReader implements GetLazyValueForPrefixAndValueHandler; it is used +// to create LazyValues (each of which can can be used to retrieve a value in a +// value block). It is used when the sstable was written with +// Properties.ValueBlocksAreEnabled. The lifetime of this object is tied to the +// lifetime of the sstable iterator. type valueBlockReader struct { bpOpen blockProviderWhenOpen rp ReaderProvider vbih valueBlocksIndexHandle stats *base.InternalIteratorStats + // fetcher is allocated lazily the first time we create a LazyValue, in order + // to avoid the allocation if we never read a lazy value (which should be the + // case when we're reading the latest value of a key). + fetcher *valueBlockFetcher - // The value blocks index is lazily retrieved the first time the reader - // needs to read a value that resides in a value block. - vbiBlock []byte - vbiCache block.BufferHandle - // When sequentially iterating through all key-value pairs, the cost of - // repeatedly getting a block that is already in the cache and releasing the - // bufferHandle can be ~40% of the cpu overhead. So the reader remembers the - // last value block it retrieved, in case there is locality of access, and - // this value block can be used for the next value retrieval. - valueBlockNum uint32 - valueBlock []byte - valueBlockPtr unsafe.Pointer - valueCache block.BufferHandle - lazyFetcher base.LazyFetcher - closed bool - bufToMangle []byte + // lazyFetcher is the LazyFetcher value embedded in any LazyValue that was returned. + lazyFetcher base.LazyFetcher } +var _ block.GetLazyValueForPrefixAndValueHandler = (*valueBlockReader)(nil) + func (r *valueBlockReader) GetLazyValueForPrefixAndValueHandle(handle []byte) base.LazyValue { + if r.fetcher == nil { + // NB: we cannot avoid this allocation, since the valueBlockFetcher + // can outlive the singleLevelIterator due to be being embedded in a + // LazyValue. + // + // TODO(radu): since it is a relatively small object, we could allocate + // multiple instances together, using a sync.Pool (each pool object would + // contain an array of instances, a subset of which have been given out). + r.fetcher = newValueBlockFetcher(r.bpOpen, r.rp, r.vbih, r.stats) + } fetcher := &r.lazyFetcher valLen, h := decodeLenFromValueHandle(handle[1:]) *fetcher = base.LazyFetcher{ - Fetcher: r, + Fetcher: r.fetcher, Attribute: base.AttributeAndLen{ ValueLen: int32(valLen), ShortAttribute: block.ValuePrefix(handle[0]).ShortAttribute(), @@ -770,45 +774,75 @@ func (r *valueBlockReader) GetLazyValueForPrefixAndValueHandle(handle []byte) ba func (r *valueBlockReader) close() { r.bpOpen = nil - r.vbiBlock = nil - r.vbiCache.Release() - // Set the handle to empty since Release does not nil the Handle.value. If - // we were to reopen this valueBlockReader and retrieve the same - // Handle.value from the cache, we don't want to accidentally unref it when - // attempting to unref the old handle. - r.vbiCache = block.BufferHandle{} - r.valueBlock = nil - r.valueBlockPtr = nil - r.valueCache.Release() - // See comment above. - r.valueCache = block.BufferHandle{} - r.closed = true - // rp, vbih, stats remain valid, so that LazyFetcher.ValueFetcher can be - // implemented. + if r.fetcher != nil { + r.fetcher.close() + r.fetcher = nil + } +} + +// valueBlockFetcher implements base.ValueFetcher and is used through LazyValue +// to fetch a value from a value block. The lifetime of this object is not tied +// to the lifetime of the iterator - a LazyValue can be accessed later. +type valueBlockFetcher struct { + bpOpen blockProviderWhenOpen + rp ReaderProvider + vbih valueBlocksIndexHandle + stats *base.InternalIteratorStats + // The value blocks index is lazily retrieved the first time the reader + // needs to read a value that resides in a value block. + vbiBlock []byte + vbiCache block.BufferHandle + // When sequentially iterating through all key-value pairs, the cost of + // repeatedly getting a block that is already in the cache and releasing the + // bufferHandle can be ~40% of the cpu overhead. So the reader remembers the + // last value block it retrieved, in case there is locality of access, and + // this value block can be used for the next value retrieval. + valueBlockNum uint32 + valueBlock []byte + valueBlockPtr unsafe.Pointer + valueCache block.BufferHandle + closed bool + bufToMangle []byte +} + +var _ base.ValueFetcher = (*valueBlockFetcher)(nil) + +func newValueBlockFetcher( + bpOpen blockProviderWhenOpen, + rp ReaderProvider, + vbih valueBlocksIndexHandle, + stats *base.InternalIteratorStats, +) *valueBlockFetcher { + return &valueBlockFetcher{ + bpOpen: bpOpen, + rp: rp, + vbih: vbih, + stats: stats, + } } // Fetch implements base.ValueFetcher. -func (r *valueBlockReader) Fetch( +func (f *valueBlockFetcher) Fetch( ctx context.Context, handle []byte, valLen int32, buf []byte, ) (val []byte, callerOwned bool, err error) { - if !r.closed { - val, err := r.getValueInternal(handle, valLen) + if !f.closed { + val, err := f.getValueInternal(handle, valLen) if invariants.Enabled { - val = r.doValueMangling(val) + val = f.doValueMangling(val) } return val, false, err } - bp := blockProviderWhenClosed{rp: r.rp} + bp := blockProviderWhenClosed{rp: f.rp} err = bp.open(ctx) if err != nil { return nil, false, err } defer bp.close() - defer r.close() - r.bpOpen = bp + defer f.close() + f.bpOpen = bp var v []byte - v, err = r.getValueInternal(handle, valLen) + v, err = f.getValueInternal(handle, valLen) if err != nil { return nil, false, err } @@ -816,74 +850,92 @@ func (r *valueBlockReader) Fetch( return buf, true, nil } +func (f *valueBlockFetcher) close() { + f.vbiBlock = nil + f.vbiCache.Release() + // Set the handle to empty since Release does not nil the Handle.value. If + // we were to reopen this valueBlockFetcher and retrieve the same + // Handle.value from the cache, we don't want to accidentally unref it when + // attempting to unref the old handle. + f.vbiCache = block.BufferHandle{} + f.valueBlock = nil + f.valueBlockPtr = nil + f.valueCache.Release() + // See comment above. + f.valueCache = block.BufferHandle{} + f.closed = true + // rp, vbih, stats remain valid, so that LazyFetcher.ValueFetcher can be + // implemented. +} + // doValueMangling attempts to uncover violations of the contract listed in // the declaration comment of LazyValue. It is expensive, hence only called // when invariants.Enabled. -func (r *valueBlockReader) doValueMangling(v []byte) []byte { +func (f *valueBlockFetcher) doValueMangling(v []byte) []byte { // Randomly set the bytes in the previous retrieved value to 0, since // property P1 only requires the valueBlockReader to maintain the memory of // one fetched value. if rand.IntN(2) == 0 { - clear(r.bufToMangle) + clear(f.bufToMangle) } // Store the current value in a new buffer for future mangling. - r.bufToMangle = append([]byte(nil), v...) - return r.bufToMangle + f.bufToMangle = append([]byte(nil), v...) + return f.bufToMangle } -func (r *valueBlockReader) getValueInternal(handle []byte, valLen int32) (val []byte, err error) { +func (f *valueBlockFetcher) getValueInternal(handle []byte, valLen int32) (val []byte, err error) { vh := decodeRemainingValueHandle(handle) vh.valueLen = uint32(valLen) - if r.vbiBlock == nil { - ch, err := r.bpOpen.readBlockForVBR(r.vbih.h, r.stats) + if f.vbiBlock == nil { + ch, err := f.bpOpen.readBlockForVBR(f.vbih.h, f.stats) if err != nil { return nil, err } - r.vbiCache = ch - r.vbiBlock = ch.BlockData() + f.vbiCache = ch + f.vbiBlock = ch.BlockData() } - if r.valueBlock == nil || r.valueBlockNum != vh.blockNum { - vbh, err := r.getBlockHandle(vh.blockNum) + if f.valueBlock == nil || f.valueBlockNum != vh.blockNum { + vbh, err := f.getBlockHandle(vh.blockNum) if err != nil { return nil, err } - vbCacheHandle, err := r.bpOpen.readBlockForVBR(vbh, r.stats) + vbCacheHandle, err := f.bpOpen.readBlockForVBR(vbh, f.stats) if err != nil { return nil, err } - r.valueBlockNum = vh.blockNum - r.valueCache.Release() - r.valueCache = vbCacheHandle - r.valueBlock = vbCacheHandle.BlockData() - r.valueBlockPtr = unsafe.Pointer(&r.valueBlock[0]) + f.valueBlockNum = vh.blockNum + f.valueCache.Release() + f.valueCache = vbCacheHandle + f.valueBlock = vbCacheHandle.BlockData() + f.valueBlockPtr = unsafe.Pointer(&f.valueBlock[0]) } - if r.stats != nil { - r.stats.SeparatedPointValue.ValueBytesFetched += uint64(valLen) + if f.stats != nil { + f.stats.SeparatedPointValue.ValueBytesFetched += uint64(valLen) } - return r.valueBlock[vh.offsetInBlock : vh.offsetInBlock+vh.valueLen], nil + return f.valueBlock[vh.offsetInBlock : vh.offsetInBlock+vh.valueLen], nil } -func (r *valueBlockReader) getBlockHandle(blockNum uint32) (block.Handle, error) { +func (f *valueBlockFetcher) getBlockHandle(blockNum uint32) (block.Handle, error) { indexEntryLen := - int(r.vbih.blockNumByteLength + r.vbih.blockOffsetByteLength + r.vbih.blockLengthByteLength) + int(f.vbih.blockNumByteLength + f.vbih.blockOffsetByteLength + f.vbih.blockLengthByteLength) offsetInIndex := indexEntryLen * int(blockNum) - if len(r.vbiBlock) < offsetInIndex+indexEntryLen { + if len(f.vbiBlock) < offsetInIndex+indexEntryLen { return block.Handle{}, base.AssertionFailedf( "index entry out of bounds: offset %d length %d block length %d", - offsetInIndex, indexEntryLen, len(r.vbiBlock)) + offsetInIndex, indexEntryLen, len(f.vbiBlock)) } - b := r.vbiBlock[offsetInIndex : offsetInIndex+indexEntryLen] - n := int(r.vbih.blockNumByteLength) + b := f.vbiBlock[offsetInIndex : offsetInIndex+indexEntryLen] + n := int(f.vbih.blockNumByteLength) bn := littleEndianGet(b, n) if uint32(bn) != blockNum { return block.Handle{}, errors.Errorf("expected block num %d but found %d", blockNum, bn) } b = b[n:] - n = int(r.vbih.blockOffsetByteLength) + n = int(f.vbih.blockOffsetByteLength) blockOffset := littleEndianGet(b, n) b = b[n:] - n = int(r.vbih.blockLengthByteLength) + n = int(f.vbih.blockLengthByteLength) blockLen := littleEndianGet(b, n) return block.Handle{Offset: blockOffset, Length: blockLen}, nil } diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 808637db2a..bac220b59c 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -378,7 +378,7 @@ func TestWriterWithValueBlocks(t *testing.T) { return err.Error() } forceRowIterIgnoreValueBlocks := func(i *singleLevelIteratorRowBlocks) { - i.vbReader = nil + i.vbReader = valueBlockReader{} i.data.SetGetLazyValuer(nil) i.data.SetHasValuePrefix(false) }