Skip to content

Commit

Permalink
sstable: use block cache / readhandle for downloads
Browse files Browse the repository at this point in the history
Previously, if we had some blocks in the block cache at the time
of starting a download, we would still read all blocks from object
storage. This was pretty wasteful, especially for index/footer/etc
blocks that are more likely to be in cache.

This change threads through the appropriate Reader through
sstable.CopySpan, so that the cached blocks can be appropriately
used. Data blocks are also read through the cache if they exist
in the cache; if not, we fall back to the old sequence of reading
and writing bytes blindly from object storage to local disk as
fast as IO allows us to.

Fixes #3758.
  • Loading branch information
itsbilal committed Aug 13, 2024
1 parent 9f56153 commit 80a5615
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 37 deletions.
17 changes: 12 additions & 5 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2287,11 +2287,18 @@ func (d *DB) runCopyCompaction(
}
}

wrote, err := sstable.CopySpan(ctx,
src, d.opts.MakeReaderOptions(),
w, d.opts.MakeWriterOptions(c.outputLevel.level, d.FormatMajorVersion().MaxTableFormat()),
start, end,
)
// NB: external files are always virtual.
var wrote uint64
err = d.tableCache.withVirtualReader(inputMeta.VirtualMeta(), func(r sstable.VirtualReader) error {
var err error
wrote, err = sstable.CopySpan(ctx,
src, r.UnsafeReader(), d.opts.MakeReaderOptions(),
w, d.opts.MakeWriterOptions(c.outputLevel.level, d.FormatMajorVersion().MaxTableFormat()),
start, end,
)
return err
})

src = nil // We passed src to CopySpan; it's responsible for closing it.
if err != nil {
if errors.Is(err, sstable.ErrEmptySpan) {
Expand Down
4 changes: 1 addition & 3 deletions objstorage/objstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,7 @@ type RemoteObjectToAttach struct {
}

// Copy copies the specified range from the input to the output.
func Copy(ctx context.Context, in Readable, out Writable, offset, length uint64) error {
r := in.NewReadHandle(NoReadBefore)
r.SetupForCompaction()
func Copy(ctx context.Context, r ReadHandle, out Writable, offset, length uint64) error {
buf := make([]byte, 256<<10)
end := offset + length
for offset < end {
Expand Down
95 changes: 66 additions & 29 deletions sstable/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,13 @@ import (
func CopySpan(
ctx context.Context,
input objstorage.Readable,
r *Reader,
rOpts ReaderOptions,
output objstorage.Writable,
o WriterOptions,
start, end InternalKey,
) (size uint64, _ error) {
r, err := NewReader(ctx, input, rOpts)
if err != nil {
input.Close()
output.Abort()
return 0, err
}
defer r.Close() // r.Close now owns calling input.Close().
defer input.Close()

if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
Expand Down Expand Up @@ -97,6 +92,7 @@ func CopySpan(
rh := objstorageprovider.UsePreallocatedReadHandle(
r.readable, objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
defer rh.Close()
rh.SetupForCompaction()
indexH, err := r.readIndex(ctx, rh, nil, nil)
if err != nil {
return 0, err
Expand Down Expand Up @@ -150,34 +146,73 @@ func CopySpan(
return 0, ErrEmptySpan
}

// Find the span of the input file that contains all our blocks, and then copy
// it byte-for-byte without doing any per-key processing.
offset := blocks[0].bh.Offset

// The block lengths don't include their trailers, which just sit after the
// block length, before the next offset; We get the ones between the blocks
// we copy implicitly but need to explicitly add the last trailer to length.
length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + block.TrailerLen - offset
// Copy all blocks byte-for-byte without doing any per-key processing.
var blocksNotInCache []indexEntry

if spanEnd := length + offset; spanEnd < offset {
return 0, base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", offset, spanEnd)
}

if err := objstorage.Copy(ctx, r.readable, w.layout.writable, offset, length); err != nil {
return 0, err
copyBlocksToFile := func(blocks []indexEntry) error {
blockOffset := blocks[0].bh.Offset
// The block lengths don't include their trailers, which just sit after the
// block length, before the next offset; We get the ones between the blocks
// we copy implicitly but need to explicitly add the last trailer to length.
length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + block.TrailerLen - blockOffset
if spanEnd := length + blockOffset; spanEnd < blockOffset {
return base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", blockOffset, spanEnd)
}
if err := objstorage.Copy(ctx, rh, w.layout.writable, blockOffset, length); err != nil {
return err
}
// Update w.meta.Size so subsequently flushed metadata has correct offsets.
w.meta.Size += length
for i := range blocks {
blocks[i].bh.Offset = w.layout.offset
// blocks[i].bh.Length remains unmodified.
if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
return err
}
w.layout.offset += uint64(blocks[i].bh.Length) + block.TrailerLen
}
return nil
}
w.layout.offset += length
for i := range blocks {
h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, blocks[i].bh.Offset)
if h.Get() == nil {
// Cache miss. Add this block to the list of blocks that are not in cache.
blocksNotInCache = blocks[i-len(blocksNotInCache) : i+1]
continue
}

// Update w.meta.Size so subsequently flushed metadata has correct offsets.
w.meta.Size += length
// Cache hit.
rh.RecordCacheHit(ctx, int64(blocks[i].bh.Offset), int64(blocks[i].bh.Length+block.TrailerLen))
if len(blocksNotInCache) > 0 {
// We have some blocks that were not in cache preceding this block.
// Copy them using objstorage.Copy.
if err := copyBlocksToFile(blocksNotInCache); err != nil {
h.Release()
return 0, err
}
blocksNotInCache = nil
}

// Now we can setup index entries for all the blocks we just copied, pointing
// into the copied span.
for i := range blocks {
blocks[i].bh.Offset -= offset
// layout.WriteDataBlock keeps layout.offset up-to-date for us.
bh, err := w.layout.WriteDataBlock(h.Get(), &w.dataBlockBuf.blockBuf)
h.Release()
if err != nil {
return 0, err
}
blocks[i].bh.Handle = bh
if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
return 0, err
}
w.meta.Size += uint64(bh.Length) + block.TrailerLen
}

if len(blocksNotInCache) > 0 {
// We have some remaining blocks that were not in cache. Copy them
// using objstorage.Copy.
if err := copyBlocksToFile(blocksNotInCache); err != nil {
return 0, err
}
blocksNotInCache = nil
}

// TODO(dt): Copy range keys (the fact there are none is checked above).
Expand Down Expand Up @@ -282,7 +317,9 @@ func copyWholeFileBecauseOfUnsupportedFeature(
ctx context.Context, input objstorage.Readable, output objstorage.Writable,
) (size uint64, _ error) {
length := uint64(input.Size())
if err := objstorage.Copy(ctx, input, output, 0, length); err != nil {
rh := input.NewReadHandle(objstorage.NoReadBefore)
rh.SetupForCompaction()
if err := objstorage.Copy(ctx, rh, output, 0, length); err != nil {
output.Abort()
return 0, err
}
Expand Down
5 changes: 5 additions & 0 deletions sstable/reader_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ func (v *VirtualReader) NewRawRangeKeyIter(
), nil
}

// UnsafeReader returns the underlying *sstable.Reader behind a VirtualReader.
func (v *VirtualReader) UnsafeReader() *Reader {
return v.reader
}

// Constrain bounds will narrow the start, end bounds if they do not fit within
// the virtual sstable. The function will return if the new end key is
// inclusive.
Expand Down

0 comments on commit 80a5615

Please sign in to comment.