diff --git a/sstable/reader.go b/sstable/reader.go index 029fe0c07a..e10c03bc1f 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -100,10 +100,10 @@ type Reader struct { filterMetricsTracker *FilterMetricsTracker logger base.LoggerAndTracer + Comparer *base.Comparer Compare Compare SuffixCmp CompareSuffixes Equal Equal - FormatKey base.FormatKey Split Split tableFilter *tableFilterReader @@ -979,16 +979,16 @@ func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Re r.footerBH = footer.footerBH if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName { + r.Comparer = o.Comparer r.Compare = o.Comparer.Compare r.SuffixCmp = o.Comparer.CompareSuffixes r.Equal = o.Comparer.Equal - r.FormatKey = o.Comparer.FormatKey r.Split = o.Comparer.Split } else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok { + r.Comparer = o.Comparer r.Compare = comparer.Compare r.SuffixCmp = comparer.CompareSuffixes r.Equal = comparer.Equal - r.FormatKey = comparer.FormatKey r.Split = comparer.Split } else { r.err = errors.Errorf("pebble/table: %d: unknown comparer %s", diff --git a/sstable/rowblk/rowblk_rewrite.go b/sstable/rowblk/rowblk_rewrite.go new file mode 100644 index 0000000000..ed370b0cf8 --- /dev/null +++ b/sstable/rowblk/rowblk_rewrite.go @@ -0,0 +1,94 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package rowblk + +import ( + "bytes" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/bytealloc" + "github.com/cockroachdb/pebble/sstable/block" +) + +// NewRewriter constructs a new rewriter. +func NewRewriter(comparer *base.Comparer, restartInterval int) *Rewriter { + rw := &Rewriter{comparer: comparer} + rw.writer.RestartInterval = restartInterval + return rw +} + +// Rewriter may be used to rewrite row-based blocks. +type Rewriter struct { + comparer *base.Comparer + // bufs resued each call to Rewrite. + writer Writer + iter Iter + scratchKey base.InternalKey + // alloc grown throughout the lifetime of the rewriter. + keyAlloc bytealloc.A +} + +// RewriteSuffixes rewrites the input block. It expects the input block to only +// contain keys with the suffix `from`. It rewrites the block to contain the +// same keys with the suffix `to`. +// +// RewriteSuffixes returns the start and end keys of the rewritten block, and the +// finished rewritten block. +func (r *Rewriter) RewriteSuffixes( + input []byte, from []byte, to []byte, +) (start, end base.InternalKey, rewritten []byte, err error) { + if err := r.iter.Init(r.comparer.Compare, r.comparer.Split, input, block.NoTransforms); err != nil { + return base.InternalKey{}, base.InternalKey{}, nil, err + } + if cap(r.writer.restarts) < int(r.iter.restarts) { + r.writer.restarts = make([]uint32, 0, r.iter.restarts) + } + if cap(r.writer.buf) == 0 { + r.writer.buf = make([]byte, 0, len(input)) + } + if cap(r.writer.restarts) < int(r.iter.numRestarts) { + r.writer.restarts = make([]uint32, 0, r.iter.numRestarts) + } + for kv := r.iter.First(); kv != nil; kv = r.iter.Next() { + if kv.Kind() != base.InternalKeyKindSet { + return base.InternalKey{}, base.InternalKey{}, nil, + errors.New("key does not have expected kind (set)") + } + si := r.comparer.Split(kv.K.UserKey) + oldSuffix := kv.K.UserKey[si:] + if !bytes.Equal(oldSuffix, from) { + return base.InternalKey{}, base.InternalKey{}, nil, + errors.Errorf("key has suffix %q, expected %q", oldSuffix, from) + } + newLen := si + len(to) + if cap(r.scratchKey.UserKey) < newLen { + r.scratchKey.UserKey = make([]byte, 0, len(kv.K.UserKey)*2+len(to)-len(from)) + } + + r.scratchKey.Trailer = kv.K.Trailer + r.scratchKey.UserKey = r.scratchKey.UserKey[:newLen] + copy(r.scratchKey.UserKey, kv.K.UserKey[:si]) + copy(r.scratchKey.UserKey[si:], to) + + // NB: for TableFormatPebblev3 and higher, since + // !iter.lazyValueHandling.hasValuePrefix, it will return the raw value + // in the block, which includes the 1-byte prefix. This is fine since bw + // also does not know about the prefix and will preserve it in bw.add. + v := kv.InPlaceValue() + r.writer.Add(r.scratchKey, v) + if start.UserKey == nil { + // Copy the first key. + start.Trailer = r.scratchKey.Trailer + r.keyAlloc, start.UserKey = r.keyAlloc.Copy(r.scratchKey.UserKey) + } + } + // Copy the last key. + end.Trailer = r.scratchKey.Trailer + r.keyAlloc, end.UserKey = r.keyAlloc.Copy(r.scratchKey.UserKey) + + r.iter = r.iter.ResetForReuse() + return start, end, r.writer.Finish(), nil +} diff --git a/sstable/suffix_rewriter.go b/sstable/suffix_rewriter.go index dc6c21878e..f588541b4d 100644 --- a/sstable/suffix_rewriter.go +++ b/sstable/suffix_rewriter.go @@ -1,3 +1,7 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + package sstable import ( @@ -112,7 +116,7 @@ func rewriteKeySuffixesInBlocks( return nil, TableFormatUnspecified, errors.Wrap(err, "reading layout") } - if err := rewriteDataBlocksToWriter(r, w, l.Data, from, to, w.split, concurrency); err != nil { + if err := rewriteDataBlocksToWriter(r, w, l.Data, from, to, concurrency); err != nil { return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting data blocks") } @@ -145,109 +149,47 @@ func rewriteKeySuffixesInBlocks( var errBadKind = errors.New("key does not have expected kind (set)") type blockWithSpan struct { - start, end InternalKey + start, end base.InternalKey physical block.PhysicalBlock } -func rewriteBlocks( +type blockRewriter interface { + RewriteSuffixes( + input []byte, from []byte, to []byte, + ) (start, end base.InternalKey, rewritten []byte, err error) +} + +func rewriteBlocks[R blockRewriter]( r *Reader, - restartInterval int, + rw R, checksumType block.ChecksumType, compression block.Compression, input []BlockHandleWithProperties, output []blockWithSpan, totalWorkers, worker int, from, to []byte, - split Split, ) error { - bw := rowblk.Writer{RestartInterval: restartInterval} - buf := blockBuf{checksummer: block.Checksummer{Type: checksumType}} var blockAlloc bytealloc.A - var keyAlloc bytealloc.A - var scratch InternalKey - + var compressedBuf []byte var inputBlock, inputBlockBuf []byte - - iter := &rowblk.Iter{} - + checksummer := block.Checksummer{Type: checksumType} // We'll assume all blocks are _roughly_ equal so round-robin static partition // of each worker doing every ith block is probably enough. for i := worker; i < len(input); i += totalWorkers { bh := input[i] - var err error inputBlock, inputBlockBuf, err = readBlockBuf(r, bh.Handle, inputBlockBuf) if err != nil { return err } - if err := iter.Init(r.Compare, r.Split, inputBlock, NoTransforms); err != nil { + var outputBlock []byte + output[i].start, output[i].end, outputBlock, err = + rw.RewriteSuffixes(inputBlock, from, to) + if err != nil { return err } - - /* - TODO(jackson): Move rewriteBlocks into rowblk. - - if cap(bw.restarts) < int(iter.restarts) { - bw.restarts = make([]uint32, 0, iter.restarts) - } - if cap(bw.buf) == 0 { - bw.buf = make([]byte, 0, len(inputBlock)) - } - if cap(bw.restarts) < int(iter.numRestarts) { - bw.restarts = make([]uint32, 0, iter.numRestarts) - } - */ - - for kv := iter.First(); kv != nil; kv = iter.Next() { - if kv.Kind() != InternalKeyKindSet { - return errBadKind - } - si := split(kv.K.UserKey) - oldSuffix := kv.K.UserKey[si:] - if !bytes.Equal(oldSuffix, from) { - err := errors.Errorf("key has suffix %q, expected %q", oldSuffix, from) - return err - } - newLen := si + len(to) - if cap(scratch.UserKey) < newLen { - scratch.UserKey = make([]byte, 0, len(kv.K.UserKey)*2+len(to)-len(from)) - } - - scratch.Trailer = kv.K.Trailer - scratch.UserKey = scratch.UserKey[:newLen] - copy(scratch.UserKey, kv.K.UserKey[:si]) - copy(scratch.UserKey[si:], to) - - // NB: for TableFormatPebblev3 and higher, since - // !iter.lazyValueHandling.hasValuePrefix, it will return the raw value - // in the block, which includes the 1-byte prefix. This is fine since bw - // also does not know about the prefix and will preserve it in bw.add. - v := kv.InPlaceValue() - if invariants.Enabled && r.tableFormat >= TableFormatPebblev3 && - kv.Kind() == InternalKeyKindSet { - if len(v) < 1 { - return errors.Errorf("value has no prefix") - } - prefix := block.ValuePrefix(v[0]) - if prefix.IsValueHandle() { - return errors.Errorf("value prefix is incorrect") - } - if prefix.SetHasSamePrefix() { - return errors.Errorf("multiple keys with same key prefix") - } - } - bw.Add(scratch, v) - if output[i].start.UserKey == nil { - keyAlloc, output[i].start = cloneKeyWithBuf(scratch, keyAlloc) - } - } - *iter = iter.ResetForReuse() - - keyAlloc, output[i].end = cloneKeyWithBuf(scratch, keyAlloc) - - finished := block.CompressAndChecksum(&buf.compressedBuf, bw.Finish(), compression, &buf.checksummer) - - // copy our finished block into the output buffer. + compressedBuf = compressedBuf[:cap(compressedBuf)] + finished := block.CompressAndChecksum(&compressedBuf, outputBlock, compression, &checksummer) output[i].physical = finished.CloneWithByteAlloc(&blockAlloc) } return nil @@ -264,12 +206,7 @@ func checkWriterFilterMatchesReader(r *Reader, w *RawWriter) error { } func rewriteDataBlocksToWriter( - r *Reader, - w *RawWriter, - data []BlockHandleWithProperties, - from, to []byte, - split Split, - concurrency int, + r *Reader, w *RawWriter, data []BlockHandleWithProperties, from, to []byte, concurrency int, ) error { if r.Properties.NumEntries == 0 { // No point keys. @@ -290,9 +227,13 @@ func rewriteDataBlocksToWriter( worker := i go func() { defer g.Done() + + rw := rowblk.NewRewriter( + r.Comparer, + w.dataBlockBuf.dataBlock.RestartInterval) err := rewriteBlocks( r, - w.dataBlockBuf.dataBlock.RestartInterval, + rw, w.blockBuf.checksummer.Type, w.compression, data, @@ -300,7 +241,6 @@ func rewriteDataBlocksToWriter( concurrency, worker, from, to, - split, ) if err != nil { errCh <- err