Skip to content

Commit

Permalink
sstable/rowblk: extract block-level suffix rewriting
Browse files Browse the repository at this point in the history
Move the suffix-rewriting logic that's specific to row-based blocks into a
rowblk.Rewriter type. In future work, a columnar equivalent will be introduced.
  • Loading branch information
jbowens committed Aug 1, 2024
1 parent 045168c commit 7f76f1a
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 91 deletions.
6 changes: 3 additions & 3 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ type Reader struct {
filterMetricsTracker *FilterMetricsTracker
logger base.LoggerAndTracer

Comparer *base.Comparer
Compare Compare
SuffixCmp CompareSuffixes
Equal Equal
FormatKey base.FormatKey
Split Split

tableFilter *tableFilterReader
Expand Down Expand Up @@ -979,16 +979,16 @@ func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Re
r.footerBH = footer.footerBH

if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
r.Comparer = o.Comparer
r.Compare = o.Comparer.Compare
r.SuffixCmp = o.Comparer.CompareSuffixes
r.Equal = o.Comparer.Equal
r.FormatKey = o.Comparer.FormatKey
r.Split = o.Comparer.Split
} else if comparer, ok := o.Comparers[r.Properties.ComparerName]; ok {
r.Comparer = o.Comparer
r.Compare = comparer.Compare
r.SuffixCmp = comparer.CompareSuffixes
r.Equal = comparer.Equal
r.FormatKey = comparer.FormatKey
r.Split = comparer.Split
} else {
r.err = errors.Errorf("pebble/table: %d: unknown comparer %s",
Expand Down
94 changes: 94 additions & 0 deletions sstable/rowblk/rowblk_rewrite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package rowblk

import (
"bytes"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/bytealloc"
"github.com/cockroachdb/pebble/sstable/block"
)

// NewRewriter constructs a new rewriter.
func NewRewriter(comparer *base.Comparer, restartInterval int) *Rewriter {
rw := &Rewriter{comparer: comparer}
rw.writer.RestartInterval = restartInterval
return rw
}

// Rewriter may be used to rewrite row-based blocks.
type Rewriter struct {
comparer *base.Comparer
// bufs resued each call to Rewrite.
writer Writer
iter Iter
scratchKey base.InternalKey
// alloc grown throughout the lifetime of the rewriter.
keyAlloc bytealloc.A
}

// RewriteSuffixes rewrites the input block. It expects the input block to only
// contain keys with the suffix `from`. It rewrites the block to contain the
// same keys with the suffix `to`.
//
// RewriteSuffixes returns the start and end keys of the rewritten block, and the
// finished rewritten block.
func (r *Rewriter) RewriteSuffixes(
input []byte, from []byte, to []byte,
) (start, end base.InternalKey, rewritten []byte, err error) {
if err := r.iter.Init(r.comparer.Compare, r.comparer.Split, input, block.NoTransforms); err != nil {
return base.InternalKey{}, base.InternalKey{}, nil, err
}
if cap(r.writer.restarts) < int(r.iter.restarts) {
r.writer.restarts = make([]uint32, 0, r.iter.restarts)
}
if cap(r.writer.buf) == 0 {
r.writer.buf = make([]byte, 0, len(input))
}
if cap(r.writer.restarts) < int(r.iter.numRestarts) {
r.writer.restarts = make([]uint32, 0, r.iter.numRestarts)
}
for kv := r.iter.First(); kv != nil; kv = r.iter.Next() {
if kv.Kind() != base.InternalKeyKindSet {
return base.InternalKey{}, base.InternalKey{}, nil,
errors.New("key does not have expected kind (set)")
}
si := r.comparer.Split(kv.K.UserKey)
oldSuffix := kv.K.UserKey[si:]
if !bytes.Equal(oldSuffix, from) {
return base.InternalKey{}, base.InternalKey{}, nil,
errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
}
newLen := si + len(to)
if cap(r.scratchKey.UserKey) < newLen {
r.scratchKey.UserKey = make([]byte, 0, len(kv.K.UserKey)*2+len(to)-len(from))
}

r.scratchKey.Trailer = kv.K.Trailer
r.scratchKey.UserKey = r.scratchKey.UserKey[:newLen]
copy(r.scratchKey.UserKey, kv.K.UserKey[:si])
copy(r.scratchKey.UserKey[si:], to)

// NB: for TableFormatPebblev3 and higher, since
// !iter.lazyValueHandling.hasValuePrefix, it will return the raw value
// in the block, which includes the 1-byte prefix. This is fine since bw
// also does not know about the prefix and will preserve it in bw.add.
v := kv.InPlaceValue()
r.writer.Add(r.scratchKey, v)
if start.UserKey == nil {
// Copy the first key.
start.Trailer = r.scratchKey.Trailer
r.keyAlloc, start.UserKey = r.keyAlloc.Copy(r.scratchKey.UserKey)
}
}
// Copy the last key.
end.Trailer = r.scratchKey.Trailer
r.keyAlloc, end.UserKey = r.keyAlloc.Copy(r.scratchKey.UserKey)

r.iter = r.iter.ResetForReuse()
return start, end, r.writer.Finish(), nil
}
116 changes: 28 additions & 88 deletions sstable/suffix_rewriter.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package sstable

import (
Expand Down Expand Up @@ -112,7 +116,7 @@ func rewriteKeySuffixesInBlocks(
return nil, TableFormatUnspecified, errors.Wrap(err, "reading layout")
}

if err := rewriteDataBlocksToWriter(r, w, l.Data, from, to, w.split, concurrency); err != nil {
if err := rewriteDataBlocksToWriter(r, w, l.Data, from, to, concurrency); err != nil {
return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting data blocks")
}

Expand Down Expand Up @@ -145,109 +149,47 @@ func rewriteKeySuffixesInBlocks(
var errBadKind = errors.New("key does not have expected kind (set)")

type blockWithSpan struct {
start, end InternalKey
start, end base.InternalKey
physical block.PhysicalBlock
}

func rewriteBlocks(
type blockRewriter interface {
RewriteSuffixes(
input []byte, from []byte, to []byte,
) (start, end base.InternalKey, rewritten []byte, err error)
}

func rewriteBlocks[R blockRewriter](
r *Reader,
restartInterval int,
rw R,
checksumType block.ChecksumType,
compression block.Compression,
input []BlockHandleWithProperties,
output []blockWithSpan,
totalWorkers, worker int,
from, to []byte,
split Split,
) error {
bw := rowblk.Writer{RestartInterval: restartInterval}
buf := blockBuf{checksummer: block.Checksummer{Type: checksumType}}
var blockAlloc bytealloc.A
var keyAlloc bytealloc.A
var scratch InternalKey

var compressedBuf []byte
var inputBlock, inputBlockBuf []byte

iter := &rowblk.Iter{}

checksummer := block.Checksummer{Type: checksumType}
// We'll assume all blocks are _roughly_ equal so round-robin static partition
// of each worker doing every ith block is probably enough.
for i := worker; i < len(input); i += totalWorkers {
bh := input[i]

var err error
inputBlock, inputBlockBuf, err = readBlockBuf(r, bh.Handle, inputBlockBuf)
if err != nil {
return err
}
if err := iter.Init(r.Compare, r.Split, inputBlock, NoTransforms); err != nil {
var outputBlock []byte
output[i].start, output[i].end, outputBlock, err =
rw.RewriteSuffixes(inputBlock, from, to)
if err != nil {
return err
}

/*
TODO(jackson): Move rewriteBlocks into rowblk.
if cap(bw.restarts) < int(iter.restarts) {
bw.restarts = make([]uint32, 0, iter.restarts)
}
if cap(bw.buf) == 0 {
bw.buf = make([]byte, 0, len(inputBlock))
}
if cap(bw.restarts) < int(iter.numRestarts) {
bw.restarts = make([]uint32, 0, iter.numRestarts)
}
*/

for kv := iter.First(); kv != nil; kv = iter.Next() {
if kv.Kind() != InternalKeyKindSet {
return errBadKind
}
si := split(kv.K.UserKey)
oldSuffix := kv.K.UserKey[si:]
if !bytes.Equal(oldSuffix, from) {
err := errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
return err
}
newLen := si + len(to)
if cap(scratch.UserKey) < newLen {
scratch.UserKey = make([]byte, 0, len(kv.K.UserKey)*2+len(to)-len(from))
}

scratch.Trailer = kv.K.Trailer
scratch.UserKey = scratch.UserKey[:newLen]
copy(scratch.UserKey, kv.K.UserKey[:si])
copy(scratch.UserKey[si:], to)

// NB: for TableFormatPebblev3 and higher, since
// !iter.lazyValueHandling.hasValuePrefix, it will return the raw value
// in the block, which includes the 1-byte prefix. This is fine since bw
// also does not know about the prefix and will preserve it in bw.add.
v := kv.InPlaceValue()
if invariants.Enabled && r.tableFormat >= TableFormatPebblev3 &&
kv.Kind() == InternalKeyKindSet {
if len(v) < 1 {
return errors.Errorf("value has no prefix")
}
prefix := block.ValuePrefix(v[0])
if prefix.IsValueHandle() {
return errors.Errorf("value prefix is incorrect")
}
if prefix.SetHasSamePrefix() {
return errors.Errorf("multiple keys with same key prefix")
}
}
bw.Add(scratch, v)
if output[i].start.UserKey == nil {
keyAlloc, output[i].start = cloneKeyWithBuf(scratch, keyAlloc)
}
}
*iter = iter.ResetForReuse()

keyAlloc, output[i].end = cloneKeyWithBuf(scratch, keyAlloc)

finished := block.CompressAndChecksum(&buf.compressedBuf, bw.Finish(), compression, &buf.checksummer)

// copy our finished block into the output buffer.
compressedBuf = compressedBuf[:cap(compressedBuf)]
finished := block.CompressAndChecksum(&compressedBuf, outputBlock, compression, &checksummer)
output[i].physical = finished.CloneWithByteAlloc(&blockAlloc)
}
return nil
Expand All @@ -264,12 +206,7 @@ func checkWriterFilterMatchesReader(r *Reader, w *RawWriter) error {
}

func rewriteDataBlocksToWriter(
r *Reader,
w *RawWriter,
data []BlockHandleWithProperties,
from, to []byte,
split Split,
concurrency int,
r *Reader, w *RawWriter, data []BlockHandleWithProperties, from, to []byte, concurrency int,
) error {
if r.Properties.NumEntries == 0 {
// No point keys.
Expand All @@ -290,17 +227,20 @@ func rewriteDataBlocksToWriter(
worker := i
go func() {
defer g.Done()

rw := rowblk.NewRewriter(
r.Comparer,
w.dataBlockBuf.dataBlock.RestartInterval)
err := rewriteBlocks(
r,
w.dataBlockBuf.dataBlock.RestartInterval,
rw,
w.blockBuf.checksummer.Type,
w.compression,
data,
blocks,
concurrency,
worker,
from, to,
split,
)
if err != nil {
errCh <- err
Expand Down

0 comments on commit 7f76f1a

Please sign in to comment.