Skip to content

Commit

Permalink
sstable: add FragmentIterTransforms
Browse files Browse the repository at this point in the history
We add a separate struct for transforms passed to the fragment
iterator. This includes the option to elide keys at the same sequence
number.

We also move the underlying `block.Iter` handle initialization to the
fragment iterator constructor. It's up to the iterator to decide what
transformations to pass to that "raw" iterator. For example, prefix
and suffix synthesis will happen in the `FragmentIter` and these
options won't be passed to the `block.Iter`.
  • Loading branch information
RaduBerinde committed Jun 26, 2024
1 parent dcaec76 commit 18b7723
Show file tree
Hide file tree
Showing 22 changed files with 105 additions and 81 deletions.
6 changes: 4 additions & 2 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
if err != nil {
return nil, err
}
rangeDelIter, err = r.NewRawRangeDelIter(transforms)
rangeDelIter, err = r.NewRawRangeDelIter(sstable.FragmentIterTransforms{
SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum),
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -255,7 +257,7 @@ func finishInitializingExternal(ctx context.Context, it *Iterator) error {
}
for _, readers := range it.externalReaders {
for _, r := range readers {
transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
transforms := sstable.FragmentIterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
seqNum--
if rki, err := r.NewRawRangeKeyIter(transforms); err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func ingestLoad1(
}
}

iter, err := r.NewRawRangeDelIter(sstable.NoTransforms)
iter, err := r.NewRawRangeDelIter(sstable.NoFragmentTransforms)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -346,7 +346,7 @@ func ingestLoad1(

// Update the range-key bounds for the table.
{
iter, err := r.NewRawRangeKeyIter(sstable.NoTransforms)
iter, err := r.NewRawRangeKeyIter(sstable.NoFragmentTransforms)
if err != nil {
return nil, err
}
Expand Down
14 changes: 13 additions & 1 deletion internal/manifest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ func (m *FileMetadata) SyntheticSeqNum() sstable.SyntheticSeqNum {
return sstable.NoSyntheticSeqNum
}

// IterTransforms returns an sstable.IterTransforms that has SyntheticSeqNum set as needed.
// IterTransforms returns an sstable.IterTransforms populated according to the
// file.
func (m *FileMetadata) IterTransforms() sstable.IterTransforms {
return sstable.IterTransforms{
SyntheticSeqNum: m.SyntheticSeqNum(),
Expand All @@ -334,6 +335,17 @@ func (m *FileMetadata) IterTransforms() sstable.IterTransforms {
}
}

// FragmentIterTransforms returns an sstable.FragmentIterTransforms populated
// according to the file.
func (m *FileMetadata) FragmentIterTransforms() sstable.FragmentIterTransforms {
return sstable.FragmentIterTransforms{
SyntheticSeqNum: m.SyntheticSeqNum(),
// TODO(radu): support these.
//SyntheticSuffix: m.SyntheticSuffix,
//SyntheticPrefix: m.SyntheticPrefix,
}
}

// PhysicalFileMeta is used by functions which want a guarantee that their input
// belongs to a physical sst and not a virtual sst.
//
Expand Down
2 changes: 1 addition & 1 deletion level_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestCheckLevelsCornerCases(t *testing.T) {
newIters :=
func(_ context.Context, file *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts, _ iterKinds) (iterSet, error) {
r := readers[file.FileNum]
rangeDelIter, err := r.NewRawRangeDelIter(sstable.NoTransforms)
rangeDelIter, err := r.NewRawRangeDelIter(sstable.NoFragmentTransforms)
if err != nil {
return iterSet{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (lt *levelIterTest) newIters(
set.point = iter
}
if kinds.RangeDeletion() {
rangeDelIter, err := lt.readers[file.FileNum].NewRawRangeDelIter(transforms)
rangeDelIter, err := lt.readers[file.FileNum].NewRawRangeDelIter(file.FragmentIterTransforms())
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
}
Expand Down
4 changes: 2 additions & 2 deletions merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestMergingIterCornerCases(t *testing.T) {
var err error
r := readers[file.FileNum]
if kinds.RangeDeletion() {
set.rangeDeletion, err = r.NewRawRangeDelIter(sstable.NoTransforms)
set.rangeDeletion, err = r.NewRawRangeDelIter(sstable.NoFragmentTransforms)
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
}
Expand Down Expand Up @@ -667,7 +667,7 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS
if err != nil {
return iterSet{}, err
}
rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter(sstable.NoTransforms)
rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter(sstable.NoFragmentTransforms)
if err != nil {
iter.Close()
return iterSet{}, err
Expand Down
4 changes: 2 additions & 2 deletions metamorphic/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func openExternalObj(
pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
panicIfErr(err)

rangeDelIter, err = reader.NewRawRangeDelIter(sstable.NoTransforms)
rangeDelIter, err = reader.NewRawRangeDelIter(sstable.NoFragmentTransforms)
panicIfErr(err)
if rangeDelIter != nil {
rangeDelIter = keyspan.Truncate(
Expand All @@ -264,7 +264,7 @@ func openExternalObj(
)
}

rangeKeyIter, err = reader.NewRawRangeKeyIter(sstable.NoTransforms)
rangeKeyIter, err = reader.NewRawRangeKeyIter(sstable.NoFragmentTransforms)
panicIfErr(err)
if rangeKeyIter != nil {
rangeKeyIter = keyspan.Truncate(
Expand Down
4 changes: 2 additions & 2 deletions replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ func loadFlushedSSTableKeys(
}

// Load all the range tombstones.
if iter, err := r.NewRawRangeDelIter(sstable.NoTransforms); err != nil {
if iter, err := r.NewRawRangeDelIter(sstable.NoFragmentTransforms); err != nil {
return err
} else if iter != nil {
defer iter.Close()
Expand All @@ -1043,7 +1043,7 @@ func loadFlushedSSTableKeys(
}

// Load all the range keys.
if iter, err := r.NewRawRangeKeyIter(sstable.NoTransforms); err != nil {
if iter, err := r.NewRawRangeKeyIter(sstable.NoFragmentTransforms); err != nil {
return err
} else if iter != nil {
defer iter.Close()
Expand Down
21 changes: 12 additions & 9 deletions sstable/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/crc"
"github.com/cockroachdb/pebble/internal/keyspan"
)

// Handle is the file offset and length of a block.
Expand Down Expand Up @@ -90,14 +89,6 @@ func (c *Checksummer) Checksum(block []byte, blockType []byte) (checksum uint32)
return checksum
}

// FragmentIterator is a keyspan iterator that iterates over a block's spans.
type FragmentIterator interface {
keyspan.FragmentIterator

// InitHandle initializes a block from the provided buffer handle.
InitHandle(base.Compare, base.Split, BufferHandle, IterTransforms) error
}

// IterTransforms allow on-the-fly transformation of data at iteration time.
//
// These transformations could in principle be implemented as block transforms
Expand All @@ -113,6 +104,18 @@ type IterTransforms struct {
// NoTransforms is the default value for IterTransforms.
var NoTransforms = IterTransforms{}

// FragmentIterTransforms allow on-the-fly transformation of range deletion or
// range key data at iteration time.
type FragmentIterTransforms struct {
SyntheticSeqNum SyntheticSeqNum
// ElideSameSeqNum, if true, returns only the first-occurring (in forward
// order) keyspan.Key for each sequence number.
ElideSameSeqNum bool
}

// NoFragmentTransforms is the default value for IterTransforms.
var NoFragmentTransforms = FragmentIterTransforms{}

// SyntheticSeqNum is used to override all sequence numbers in a table. It is
// set to a non-zero value when the table was created externally and ingested
// whole.
Expand Down
33 changes: 11 additions & 22 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,26 +387,19 @@ func (r *Reader) newCompactionIter(
//
// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
// iterator. Add WithContext methods since the existing ones are public.
func (r *Reader) NewRawRangeDelIter(transforms IterTransforms) (keyspan.FragmentIterator, error) {
func (r *Reader) NewRawRangeDelIter(
transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error) {
if r.rangeDelBH.Length == 0 {
return nil, nil
}
if transforms.SyntheticSuffix.IsSet() {
return nil, base.AssertionFailedf("synthetic suffix not supported with range del iterator")
}
if transforms.SyntheticPrefix.IsSet() {
return nil, base.AssertionFailedf("synthetic prefix not supported with range del iterator")
}
h, err := r.readRangeDel(nil /* stats */, nil /* iterStats */)
if err != nil {
return nil, err
}
i := rowblk.NewFragmentIter(true /* elideSameSeqnum */)
// It's okay for hideObsoletePoints to be false here, even for shared ingested
// sstables. This is because rangedels do not apply to points in the same
// sstable at the same sequence number anyway, so exposing obsolete rangedels
// is harmless.
if err := i.InitHandle(r.Compare, r.Split, h, transforms); err != nil {
transforms.ElideSameSeqNum = true
i, err := rowblk.NewFragmentIter(r.Compare, r.Split, h, transforms)
if err != nil {
return nil, err
}
return keyspan.MaybeAssert(i, r.Compare), nil
Expand All @@ -418,22 +411,18 @@ func (r *Reader) NewRawRangeDelIter(transforms IterTransforms) (keyspan.Fragment
//
// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
// iterator. Add WithContext methods since the existing ones are public.
func (r *Reader) NewRawRangeKeyIter(transforms IterTransforms) (keyspan.FragmentIterator, error) {
func (r *Reader) NewRawRangeKeyIter(
transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error) {
if r.rangeKeyBH.Length == 0 {
return nil, nil
}
if transforms.SyntheticSuffix.IsSet() {
return nil, base.AssertionFailedf("synthetic suffix not supported with range key iterator")
}
if transforms.SyntheticPrefix.IsSet() {
return nil, base.AssertionFailedf("synthetic prefix not supported with range key iterator")
}
h, err := r.readRangeKey(nil /* stats */, nil /* iterStats */)
if err != nil {
return nil, err
}
i := rowblk.NewFragmentIter(false /* elideSameSeqnum */)
if err := i.InitHandle(r.Compare, r.Split, h, transforms); err != nil {
i, err := rowblk.NewFragmentIter(r.Compare, r.Split, h, transforms)
if err != nil {
return nil, err
}
return keyspan.MaybeAssert(i, r.Compare), nil
Expand Down
9 changes: 7 additions & 2 deletions sstable/reader_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
// can be used by code which doesn't care to distinguish between a reader and a
// virtual reader.
type CommonReader interface {
NewRawRangeKeyIter(transforms IterTransforms) (keyspan.FragmentIterator, error)
NewRawRangeKeyIter(transforms FragmentIterTransforms) (keyspan.FragmentIterator, error)

NewRawRangeDelIter(transforms IterTransforms) (keyspan.FragmentIterator, error)
NewRawRangeDelIter(transforms FragmentIterTransforms) (keyspan.FragmentIterator, error)

NewIterWithBlockPropertyFiltersAndContextEtc(
ctx context.Context,
Expand Down Expand Up @@ -50,6 +50,8 @@ type (
BufferPool = block.BufferPool
// IterTransforms re-exports block.IterTransforms.
IterTransforms = block.IterTransforms
// FragmentIterTransforms re-exports block.FragmentIterTransforms.
FragmentIterTransforms = block.FragmentIterTransforms
// SyntheticSeqNum re-exports block.SyntheticSeqNum.
SyntheticSeqNum = block.SyntheticSeqNum
// SyntheticSuffix re-exports block.SyntheticSuffix.
Expand All @@ -61,6 +63,9 @@ type (
// NoTransforms is the default value for IterTransforms.
var NoTransforms = block.NoTransforms

// NoFragmentTransforms is the default value for FragmentIterTransforms.
var NoFragmentTransforms = block.NoFragmentTransforms

// NoSyntheticSeqNum is the default zero value for SyntheticSeqNum, which
// disables overriding the sequence number.
const NoSyntheticSeqNum = block.NoSyntheticSeqNum
14 changes: 9 additions & 5 deletions sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i

// Set during the latest virtualize command.
var v *VirtualReader
var transforms IterTransforms
var syntheticSuffix SyntheticSuffix

defer func() {
if r != nil {
Expand Down Expand Up @@ -300,11 +300,11 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i

showProps := td.HasArg("show-props")

transforms = IterTransforms{}
syntheticSuffix = nil
if td.HasArg("suffix") {
var synthSuffixStr string
td.ScanArgs(t, "suffix", &synthSuffixStr)
transforms.SyntheticSuffix = []byte(synthSuffixStr)
syntheticSuffix = []byte(synthSuffixStr)
}

params.FileNum = nextFileNum()
Expand All @@ -327,6 +327,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i
}

var rp ReaderProvider
transforms := IterTransforms{SyntheticSuffix: syntheticSuffix}
iter, err := v.NewCompactionIter(transforms, CategoryAndQoS{}, nil, rp, &bp)
if err != nil {
return err.Error()
Expand Down Expand Up @@ -366,6 +367,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i
if v == nil {
return "virtualize must be called before scan-range-del"
}
transforms := FragmentIterTransforms{} // TODO(radu): SyntheticSuffix: syntheticSuffix
iter, err := v.NewRawRangeDelIter(transforms)
if err != nil {
return err.Error()
Expand All @@ -387,8 +389,9 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i

case "scan-range-key":
if v == nil {
return "virtualize must be called before scan-range-key"
return "virtualize mupst be called before scan-range-key"
}
transforms := FragmentIterTransforms{} // TODO(radu): SyntheticSuffix: syntheticSuffix
iter, err := v.NewRawRangeKeyIter(transforms)
if err != nil {
return err.Error()
Expand Down Expand Up @@ -433,7 +436,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i
var err error
filterer, err = IntersectsTable(
[]BlockPropertyFilter{maskingFilter},
nil, wMeta.Properties.UserProperties, transforms.SyntheticSuffix,
nil, wMeta.Properties.UserProperties, syntheticSuffix,
)
if err != nil {
td.Fatalf(t, "error creating filterer: %v", err)
Expand All @@ -443,6 +446,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i
}
}

transforms := IterTransforms{SyntheticSuffix: syntheticSuffix}
iter, err := v.NewIterWithBlockPropertyFiltersAndContextEtc(
context.Background(), transforms, lower, upper, filterer, false,
&stats, CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r})
Expand Down
4 changes: 2 additions & 2 deletions sstable/reader_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (v *VirtualReader) ValidateBlockChecksumsOnBacking() error {

// NewRawRangeDelIter wraps Reader.NewRawRangeDelIter.
func (v *VirtualReader) NewRawRangeDelIter(
transforms IterTransforms,
transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error) {
iter, err := v.reader.NewRawRangeDelIter(transforms)
if err != nil {
Expand All @@ -155,7 +155,7 @@ func (v *VirtualReader) NewRawRangeDelIter(

// NewRawRangeKeyIter wraps Reader.NewRawRangeKeyIter.
func (v *VirtualReader) NewRawRangeKeyIter(
transforms IterTransforms,
transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error) {
syntheticSeqNum := transforms.SyntheticSeqNum
if v.vState.isSharedIngested {
Expand Down
Loading

0 comments on commit 18b7723

Please sign in to comment.