Skip to content

Commit

Permalink
sstable: convert RawWriter into an interface
Browse files Browse the repository at this point in the history
Convert RawWRiter into an interface that's satisfied by the existing
RawRowWriter type. Future work will introduce a RawColWriter that writes
sstables with columnar blocks.
  • Loading branch information
jbowens committed Aug 23, 2024
1 parent 583859f commit 3d1f61a
Show file tree
Hide file tree
Showing 19 changed files with 215 additions and 208 deletions.
2 changes: 1 addition & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2666,7 +2666,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
// compaction or flush.
func (d *DB) newCompactionOutput(
jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
) (objstorage.ObjectMetadata, *sstable.RawWriter, CPUWorkHandle, error) {
) (objstorage.ObjectMetadata, *sstable.RawRowWriter, CPUWorkHandle, error) {
d.mu.Lock()
diskFileNum := d.mu.versions.getNextDiskFileNum()
d.mu.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ func runBuildRemoteCmd(td *datadriven.TestData, d *DB, storage remote.Storage) e
for kv := iter.First(); kv != nil; kv = iter.Next() {
tmp := kv.K
tmp.SetSeqNum(0)
if err := w.Raw().Add(tmp, kv.InPlaceValue()); err != nil {
if err := w.Raw().AddWithForceObsolete(tmp, kv.InPlaceValue(), false); err != nil {
return err
}
}
Expand All @@ -599,7 +599,7 @@ func runBuildRemoteCmd(td *datadriven.TestData, d *DB, storage remote.Storage) e
for ; s != nil && err == nil; s, err = rdi.Next() {
err = rangedel.Encode(*s, func(k base.InternalKey, v []byte) error {
k.SetSeqNum(0)
return w.Raw().Add(k, v)
return w.Raw().AddWithForceObsolete(k, v, false)
})
if err != nil {
return err
Expand Down Expand Up @@ -680,7 +680,7 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
for kv := iter.First(); kv != nil; kv = iter.Next() {
tmp := kv.K
tmp.SetSeqNum(0)
if err := w.Raw().Add(tmp, kv.InPlaceValue()); err != nil {
if err := w.Raw().AddWithForceObsolete(tmp, kv.InPlaceValue(), false); err != nil {
return err
}
}
Expand All @@ -693,7 +693,7 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
for ; s != nil && err == nil; s, err = rdi.Next() {
err = rangedel.Encode(*s, func(k base.InternalKey, v []byte) error {
k.SetSeqNum(0)
return w.Raw().Add(k, v)
return w.Raw().AddWithForceObsolete(k, v, false)
})
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3224,7 +3224,7 @@ func TestIngestFileNumReuseCrash(t *testing.T) {
func TestIngest_UpdateSequenceNumber(t *testing.T) {
mem := vfs.NewMem()
cmp := base.DefaultComparer.Compare
parse := func(input string) (*sstable.RawWriter, error) {
parse := func(input string) (*sstable.RawRowWriter, error) {
f, err := mem.Create("ext", vfs.WriteCategoryUnspecified)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions internal/compact/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (r *Runner) MoreDataToWrite() bool {
// Result.Tables. Should only be called if MoreDataToWrite() returned true.
//
// WriteTable always closes the Writer.
func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.RawWriter) {
func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.RawRowWriter) {
if r.err != nil {
panic("error already encountered")
}
Expand All @@ -159,7 +159,7 @@ func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.RawWr
r.tables[len(r.tables)-1].WriterMeta = *writerMeta
}

func (r *Runner) writeKeysToTable(tw *sstable.RawWriter) (splitKey []byte, _ error) {
func (r *Runner) writeKeysToTable(tw *sstable.RawRowWriter) (splitKey []byte, _ error) {
firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
if r.key != nil && firstKey == nil {
firstKey = r.key.UserKey
Expand Down
2 changes: 1 addition & 1 deletion internal/compact/spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c *RangeKeySpanCompactor) elideInLastStripe(
//
// The span can contain either only RANGEDEL keys or only range keys.
func SplitAndEncodeSpan(
cmp base.Compare, span *keyspan.Span, upToKey []byte, tw *sstable.RawWriter,
cmp base.Compare, span *keyspan.Span, upToKey []byte, tw *sstable.RawRowWriter,
) error {
if span.Empty() {
return nil
Expand Down
2 changes: 1 addition & 1 deletion level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func buildLevelIterTables(
files[i] = f
}

writers := make([]*sstable.RawWriter, len(files))
writers := make([]*sstable.RawRowWriter, len(files))
for i := range files {
writers[i] = sstable.NewRawWriter(objstorageprovider.NewFileWritable(files[i]), sstable.WriterOptions{
BlockRestartInterval: restartInterval,
Expand Down
4 changes: 2 additions & 2 deletions merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func buildMergingIterTables(
files[i] = f
}

writers := make([]*sstable.RawWriter, len(files))
writers := make([]*sstable.RawRowWriter, len(files))
for i := range files {
writers[i] = sstable.NewRawWriter(objstorageprovider.NewFileWritable(files[i]), sstable.WriterOptions{
BlockRestartInterval: restartInterval,
Expand Down Expand Up @@ -543,7 +543,7 @@ func buildLevelsForMergingIterSeqSeek(
}

const targetL6FirstFileSize = 2 << 20
writers := make([][]*sstable.RawWriter, levelCount)
writers := make([][]*sstable.RawRowWriter, levelCount)
// A policy unlikely to have false positives.
filterPolicy := bloom.FilterPolicy(100)
for i := range files {
Expand Down
2 changes: 1 addition & 1 deletion metamorphic/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func writeSSTForIngestion(
if err != nil {
return nil, err
}
if err := w.Raw().Add(k.K, valBytes); err != nil {
if err := w.Raw().AddWithForceObsolete(k.K, valBytes, false); err != nil {
return nil, err
}
}
Expand Down
4 changes: 2 additions & 2 deletions metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ func (r *replicateOp) runSharedReplicate(
if err != nil {
panic(err)
}
return w.Raw().Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
return w.Raw().AddWithForceObsolete(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false)
},
func(start, end []byte, seqNum base.SeqNum) error {
return w.DeleteRange(start, end)
Expand Down Expand Up @@ -1991,7 +1991,7 @@ func (r *replicateOp) runExternalReplicate(
if err != nil {
panic(err)
}
return w.Raw().Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
return w.Raw().AddWithForceObsolete(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false)
},
func(start, end []byte, seqNum base.SeqNum) error {
return w.DeleteRange(start, end)
Expand Down
2 changes: 1 addition & 1 deletion scan_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func TestScanInternal(t *testing.T) {
var err error
value, _, err = kv.Value(value)
require.NoError(t, err)
require.NoError(t, w.Raw().Add(kv.K, value))
require.NoError(t, w.Raw().AddWithForceObsolete(kv.K, value, false))
}
points.Close()
require.NoError(t, w.Close())
Expand Down
18 changes: 3 additions & 15 deletions sstable/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func runBuildCmd(
for _, v := range rangeDels {
for _, k := range v.Keys {
ik := base.InternalKey{UserKey: v.Start, Trailer: k.Trailer}
if err := w.Raw().Add(ik, v.End); err != nil {
if err := w.Raw().AddWithForceObsolete(ik, v.End, false); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -227,20 +227,8 @@ func runBuildRawCmd(
j := strings.Index(data, ":")
key := base.ParseInternalKey(data[:j])
value := []byte(data[j+1:])
switch key.Kind() {
case base.InternalKeyKindRangeKeyDelete,
base.InternalKeyKindRangeKeyUnset,
base.InternalKeyKindRangeKeySet:
// Note: specifying range keys directly (instead of spans) should only be
// done to check for error handling; they will not contribute to block
// properties.
if err := w.Raw().addRangeKey(key, value); err != nil {
return nil, nil, err
}
default:
if err := w.Raw().Add(key, value); err != nil {
return nil, nil, err
}
if err := w.Raw().AddWithForceObsolete(key, value, false); err != nil {
return nil, nil, err
}
}
if err := w.Close(); err != nil {
Expand Down
Loading

0 comments on commit 3d1f61a

Please sign in to comment.