Skip to content

Commit

Permalink
Consider range tombstone in compaction output file cutting (facebook#…
Browse files Browse the repository at this point in the history
…10802)

Summary:
This PR is the first step for Issue facebook#4811. Currently compaction output files are cut at point keys, and the decision is made mainly in `CompactionOutputs::ShouldStopBefore()`. This makes it possible for range tombstones to cause large compactions that does not respect `max_compaction_bytes`. For example, we can have a large range tombstone that overlaps with too many files from the next level. Another example is when there is a gap between a range tombstone and another key. The first issue may be more acceptable, as a lot of data is deleted. This PR address the second issue by calling `ShouldStopBefore()` for range tombstone start keys. The main change is for `CompactionIterator` to emit range tombstone start keys to be processed by `CompactionOutputs`. A new `CompactionMergingIterator` is introduced and only used under `CompactionIterator` for this purpose. Further improvement after this PR include 1) cut compaction output at some grandparent boundary key instead of at the next point key or range tombstone start key and 2) cut compaction output file within a large range tombstone (it may be easier and reasonable to only do it for range tombstones at the end of a compaction output).

Pull Request resolved: facebook#10802

Test Plan:
- added unit tests in db_range_del_test.
- stress test: `python3 tools/db_crashtest.py whitebox --[simple|enable_ts] --verify_iterator_with_expected_state_one_in=5 --delrangepercent=5 --prefixpercent=2 --writepercent=58 --readpercen=21 --duration=36000 --range_deletion_width=1000000`

Reviewed By: ajkr, jay-zhuang

Differential Revision: D40308827

Pulled By: cbi42

fbshipit-source-id: a8fd6f70a3f09d0ef7a40e006f6c964bba8c00df
  • Loading branch information
cbi42 authored and facebook-github-bot committed Dec 15, 2022
1 parent 1928902 commit f02c708
Show file tree
Hide file tree
Showing 22 changed files with 847 additions and 119 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,7 @@ set(SOURCES
table/get_context.cc
table/iterator.cc
table/merging_iterator.cc
table/compaction_merging_iterator.cc
table/meta_blocks.cc
table/persistent_cache_helper.cc
table/plain/plain_table_bloom.cc
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Behavior changes
* Make best-efforts recovery verify SST unique ID before Version construction (#10962)
* Introduce `epoch_number` and sort L0 files by `epoch_number` instead of `largest_seqno`. `epoch_number` represents the order of a file being flushed or ingested/imported. Compaction output file will be assigned with the minimum `epoch_number` among input files'. For L0, larger `epoch_number` indicates newer L0 file.
* Compaction output file cutting logic now considers range tombstone start keys. For example, SST partitioner now may receive ParitionRequest for range tombstone start keys.

### Bug Fixes
* Fixed a regression in iterator where range tombstones after `iterate_upper_bound` is processed.
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"table/block_based/reader_common.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
"table/compaction_merging_iterator.cc",
"table/cuckoo/cuckoo_table_builder.cc",
"table/cuckoo/cuckoo_table_factory.cc",
"table/cuckoo/cuckoo_table_reader.cc",
Expand Down Expand Up @@ -538,6 +539,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"table/block_based/reader_common.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
"table/compaction_merging_iterator.cc",
"table/cuckoo/cuckoo_table_builder.cc",
"table/cuckoo/cuckoo_table_factory.cc",
"table/cuckoo/cuckoo_table_reader.cc",
Expand Down
11 changes: 11 additions & 0 deletions db/blob/blob_counting_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,24 @@ class BlobCountingIterator : public InternalIterator {
return iter_->GetProperty(prop_name, prop);
}

bool IsDeleteRangeSentinelKey() const override {
return iter_->IsDeleteRangeSentinelKey();
}

private:
void UpdateAndCountBlobIfNeeded() {
assert(!iter_->Valid() || iter_->status().ok());

if (!iter_->Valid()) {
status_ = iter_->status();
return;
} else if (iter_->IsDeleteRangeSentinelKey()) {
// CompactionMergingIterator emits range tombstones, and range tombstone
// keys can be truncated at file boundaries. This means the range
// tombstone keys can have op_type kTypeBlobIndex.
// This could crash the ProcessInFlow() call below since
// value is empty for these keys.
return;
}

TEST_SYNC_POINT(
Expand Down
5 changes: 5 additions & 0 deletions db/compaction/clipping_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ class ClippingIterator : public InternalIterator {
return iter_->GetProperty(prop_name, prop);
}

bool IsDeleteRangeSentinelKey() const override {
assert(valid_);
return iter_->IsDeleteRangeSentinelKey();
}

private:
void UpdateValid() {
assert(!iter_->Valid() || iter_->status().ok());
Expand Down
3 changes: 0 additions & 3 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

namespace ROCKSDB_NAMESPACE {

const uint64_t kRangeTombstoneSentinel =
PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);

int sstableKeyCompare(const Comparator* user_cmp, const InternalKey& a,
const InternalKey& b) {
auto c = user_cmp->CompareWithoutTimestamp(a.user_key(), b.user_key());
Expand Down
2 changes: 2 additions & 0 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace ROCKSDB_NAMESPACE {
// The file contains class Compaction, as well as some helper functions
// and data structures used by the class.

const uint64_t kRangeTombstoneSentinel =
PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);
// Utility for comparing sstable boundary keys. Returns -1 if either a or b is
// null which provides the property that a==null indicates a key that is less
// than any key and b==null indicates a key that is greater than any key. Note
Expand Down
31 changes: 24 additions & 7 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ void CompactionIterator::NextFromInput() {
value_ = input_.value();
blob_value_.Reset();
iter_stats_.num_input_records++;
is_range_del_ = input_.IsDeleteRangeSentinelKey();

Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
if (!pik_status.ok()) {
Expand All @@ -396,7 +397,10 @@ void CompactionIterator::NextFromInput() {
break;
}
TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);

if (is_range_del_) {
validity_info_.SetValid(kRangeDeletion);
break;
}
// Update input statistics
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
ikey_.type == kTypeDeletionWithTimestamp) {
Expand Down Expand Up @@ -618,13 +622,22 @@ void CompactionIterator::NextFromInput() {

ParsedInternalKey next_ikey;
AdvanceInputIter();
while (input_.Valid() && input_.IsDeleteRangeSentinelKey() &&
ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok() &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
// skip range tombstone start keys with the same user key
// since they are not "real" point keys.
AdvanceInputIter();
}

// Check whether the next key exists, is not corrupt, and is the same key
// as the single delete.
if (input_.Valid() &&
ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok() &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
assert(!input_.IsDeleteRangeSentinelKey());
#ifndef NDEBUG
const Compaction* c =
compaction_ ? compaction_->real_compaction() : nullptr;
Expand Down Expand Up @@ -849,12 +862,14 @@ void CompactionIterator::NextFromInput() {
// Note that a deletion marker of type kTypeDeletionWithTimestamp will be
// considered to have a different user key unless the timestamp is older
// than *full_history_ts_low_.
//
// Range tombstone start keys are skipped as they are not "real" keys.
while (!IsPausingManualCompaction() && !IsShuttingDown() &&
input_.Valid() &&
(ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok()) &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
(prev_snapshot == 0 ||
(prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() ||
DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) {
AdvanceInputIter();
}
Expand Down Expand Up @@ -1147,10 +1162,12 @@ void CompactionIterator::DecideOutputLevel() {

void CompactionIterator::PrepareOutput() {
if (Valid()) {
if (ikey_.type == kTypeValue) {
ExtractLargeValueIfNeeded();
} else if (ikey_.type == kTypeBlobIndex) {
GarbageCollectBlobIfNeeded();
if (LIKELY(!is_range_del_)) {
if (ikey_.type == kTypeValue) {
ExtractLargeValueIfNeeded();
} else if (ikey_.type == kTypeBlobIndex) {
GarbageCollectBlobIfNeeded();
}
}

if (compaction_ != nullptr && compaction_->SupportsPerKeyPlacement()) {
Expand All @@ -1173,7 +1190,7 @@ void CompactionIterator::PrepareOutput() {
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge && current_key_committed_ &&
!output_to_penultimate_level_ &&
ikey_.sequence < preserve_time_min_seqno_) {
ikey_.sequence < preserve_time_min_seqno_ && !is_range_del_) {
if (ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
ROCKS_LOG_FATAL(
Expand Down
18 changes: 17 additions & 1 deletion db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class SequenceIterWrapper : public InternalIterator {
void SeekToLast() override { assert(false); }

uint64_t num_itered() const { return num_itered_; }
bool IsDeleteRangeSentinelKey() const override {
assert(Valid());
return inner_iter_->IsDeleteRangeSentinelKey();
}

private:
InternalKeyComparator icmp_;
Expand Down Expand Up @@ -242,7 +246,12 @@ class CompactionIterator {
const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; }
inline bool Valid() const { return validity_info_.IsValid(); }
const Slice& user_key() const { return current_user_key_; }
const Slice& user_key() const {
if (UNLIKELY(is_range_del_)) {
return ikey_.user_key;
}
return current_user_key_;
}
const CompactionIterationStats& iter_stats() const { return iter_stats_; }
uint64_t num_input_entry_scanned() const { return input_.num_itered(); }
// If the current key should be placed on penultimate level, only valid if
Expand All @@ -252,6 +261,8 @@ class CompactionIterator {
}
Status InputStatus() const { return input_.status(); }

bool IsDeleteRangeSentinelKey() const { return is_range_del_; }

private:
// Processes the input stream to find the next output
void NextFromInput();
Expand Down Expand Up @@ -385,6 +396,7 @@ class CompactionIterator {
kKeepSD = 8,
kKeepDel = 9,
kNewUserKey = 10,
kRangeDeletion = 11,
};

struct ValidityInfo {
Expand Down Expand Up @@ -492,6 +504,10 @@ class CompactionIterator {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return manual_compaction_canceled_.load(std::memory_order_relaxed);
}

// Stores whether the current compaction iterator output
// is a range tombstone start key.
bool is_range_del_{false};
};

inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq,
Expand Down
1 change: 0 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true.

assert(!end.has_value() || cfd->user_comparator()->Compare(
c_iter->user_key(), end.value()) < 0);

Expand Down
72 changes: 65 additions & 7 deletions db/compaction/compaction_outputs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,14 @@ Status CompactionOutputs::AddToOutput(
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
Status s;
bool is_range_del = c_iter.IsDeleteRangeSentinelKey();
if (is_range_del && compaction_->bottommost_level()) {
// We don't consider range tombstone for bottommost level since:
// 1. there is no grandparent and hence no overlap to consider
// 2. range tombstone may be dropped at bottommost level.
return s;
}
const Slice& key = c_iter.key();

if (ShouldStopBefore(c_iter) && HasBuilder()) {
s = close_file_func(*this, c_iter.InputStatus(), key);
if (!s.ok()) {
Expand All @@ -344,6 +350,13 @@ Status CompactionOutputs::AddToOutput(
grandparent_boundary_switched_num_ = 0;
grandparent_overlapped_bytes_ =
GetCurrentKeyGrandparentOverlappedBytes(key);
if (UNLIKELY(is_range_del)) {
// lower bound for this new output file, this is needed as the lower bound
// does not come from the smallest point key in this case.
range_tombstone_lower_bound_.DecodeFrom(key);
} else {
range_tombstone_lower_bound_.Clear();
}
}

// Open output file if necessary
Expand All @@ -354,6 +367,17 @@ Status CompactionOutputs::AddToOutput(
}
}

// c_iter may emit range deletion keys, so update `last_key_for_partitioner_`
// here before returning below when `is_range_del` is true
if (partitioner_) {
last_key_for_partitioner_.assign(c_iter.user_key().data_,
c_iter.user_key().size_);
}

if (UNLIKELY(is_range_del)) {
return s;
}

assert(builder_ != nullptr);
const Slice& value = c_iter.value();
s = current_output().validator.Add(key, value);
Expand All @@ -377,11 +401,6 @@ Status CompactionOutputs::AddToOutput(
s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence,
ikey.type);

if (partitioner_) {
last_key_for_partitioner_.assign(c_iter.user_key().data_,
c_iter.user_key().size_);
}

return s;
}

Expand All @@ -398,13 +417,19 @@ Status CompactionOutputs::AddRangeDels(
std::string smallest_user_key;
const Slice *lower_bound, *upper_bound;
bool lower_bound_from_sub_compact = false;

bool lower_bound_from_range_tombstone = false;
size_t output_size = outputs_.size();
if (output_size == 1) {
// For the first output table, include range tombstones before the min
// key but after the subcompaction boundary.
lower_bound = comp_start_user_key;
lower_bound_from_sub_compact = true;
} else if (range_tombstone_lower_bound_.size() > 0) {
assert(meta.smallest.size() == 0 ||
icmp.Compare(range_tombstone_lower_bound_, meta.smallest) <= 0);
lower_bound_guard = range_tombstone_lower_bound_.user_key();
lower_bound = &lower_bound_guard;
lower_bound_from_range_tombstone = true;
} else if (meta.smallest.size() > 0) {
// For subsequent output tables, only include range tombstones from min
// key onwards since the previous file was extended to contain range
Expand Down Expand Up @@ -532,6 +557,39 @@ Status CompactionOutputs::AddRangeDels(
smallest_candidate =
InternalKey(*lower_bound, tombstone.seq_, kTypeRangeDeletion);
}
} else if (lower_bound_from_range_tombstone) {
// Range tombstone keys can be truncated at file boundaries of the files
// that contain them.
//
// If this lower bound is from a range tombstone key that is not
// truncated, i.e., it was not truncated when reading from the input
// files, then its sequence number and `op_type` will be
// kMaxSequenceNumber and kTypeRangeDeletion (see
// TruncatedRangeDelIterator::start_key()). In this case, when this key
// was used as the upper bound to cut the previous compaction output
// file, the previous file's largest key could have the same value as
// this key (see the upperbound logic below). To guarantee
// non-overlapping ranges between output files, we use the range
// tombstone's actual sequence number (tombstone.seq_) for the lower
// bound of this file. If this range tombstone key is truncated, then
// the previous file's largest key will be smaller than this range
// tombstone key, so we can use it as the lower bound directly.
if (ExtractInternalKeyFooter(range_tombstone_lower_bound_.Encode()) ==
kRangeTombstoneSentinel) {
if (ts_sz) {
smallest_candidate =
InternalKey(range_tombstone_lower_bound_.user_key(),
tombstone.seq_, kTypeRangeDeletion, tombstone.ts_);
} else {
smallest_candidate =
InternalKey(range_tombstone_lower_bound_.user_key(),
tombstone.seq_, kTypeRangeDeletion);
}
} else {
assert(GetInternalKeySeqno(range_tombstone_lower_bound_.Encode()) <
kMaxSequenceNumber);
smallest_candidate = range_tombstone_lower_bound_;
}
} else {
smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion);
}
Expand Down
5 changes: 5 additions & 0 deletions db/compaction/compaction_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ class CompactionOutputs {
std::unique_ptr<SstPartitioner> partitioner_;

// A flag determines if this subcompaction has been split by the cursor
// for RoundRobin compaction
bool is_split_ = false;

// We also maintain the output split key for each subcompaction to avoid
Expand Down Expand Up @@ -338,6 +339,10 @@ class CompactionOutputs {
// for the current output file, how many file boundaries has it crossed,
// basically number of files overlapped * 2
size_t grandparent_boundary_switched_num_ = 0;

// The smallest key of the current output file, this is set when current
// output file's smallest key is a range tombstone start key.
InternalKey range_tombstone_lower_bound_;
};

// helper struct to concatenate the last level and penultimate level outputs
Expand Down
5 changes: 5 additions & 0 deletions db/compaction/subcompaction_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class SubcompactionState {
// Assign range dels aggregator, for each range_del, it can only be assigned
// to one output level, for per_key_placement, it's going to be the
// penultimate level.
// TODO: This does not work for per_key_placement + user-defined timestamp +
// DeleteRange() combo. If user-defined timestamp is enabled,
// it is possible for a range tombstone to belong to bottommost level (
// seqno < earliest snapshot) without being dropped (garbage collection
// for user-defined timestamp).
void AssignRangeDelAggregator(
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
if (compaction->SupportsPerKeyPlacement()) {
Expand Down
Loading

0 comments on commit f02c708

Please sign in to comment.