Skip to content

Commit

Permalink
filter deletion in compaction filter (tikv#344)
Browse files Browse the repository at this point in the history
And delay the buffer initialization of writable file to first actual write.

---------

Signed-off-by: tabokie <[email protected]>
Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
tabokie authored and v01dstar committed Oct 2, 2024
1 parent bc7cc71 commit 891d589
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 10 deletions.
15 changes: 9 additions & 6 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,20 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
}

if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
ikey_.type != kTypeWideColumnEntity) {
ikey_.type != kTypeWideColumnEntity && ikey_.type != kTypeDeletion) {
return true;
}

CompactionFilter::Decision decision =
CompactionFilter::Decision::kUndetermined;
CompactionFilter::ValueType value_type =
ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
: ikey_.type == kTypeBlobIndex
? CompactionFilter::ValueType::kBlobIndex
: CompactionFilter::ValueType::kWideColumnEntity;
CompactionFilter::ValueType value_type = CompactionFilter::ValueType::kValue;
if (ikey_.type == kTypeBlobIndex) {
value_type = CompactionFilter::ValueType::kBlobIndex;
} else if (ikey_.type == kTypeWideColumnEntity) {
value_type = CompactionFilter::ValueType::kWideColumnEntity;
} else if (ikey_.type == kTypeDeletion) {
value_type = CompactionFilter::ValueType::kDeletion;
}

// Hack: pass internal key to BlobIndexCompactionFilter since it needs
// to get sequence number.
Expand Down
34 changes: 34 additions & 0 deletions db/compaction/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,40 @@ TEST_P(CompactionIteratorTest, SingleMergeOperand) {
ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
}

TEST_P(CompactionIteratorTest, RemoveAllSingleDeletes) {
struct Filter : public CompactionFilter {
Decision UnsafeFilter(int /*level*/, const Slice& key, ValueType t,
const Slice& /*existing_value*/,
std::string* /*new_value*/,
std::string* skip_until) const override {
if (t == ValueType::kDeletion) {
*skip_until = key.ToString();
skip_until->back() += 1;
filtered += 1;
return Decision::kRemoveAndSkipUntil;
}
return Decision::kKeep;
}

const char* Name() const override {
return "CompactionIteratorTest.SingleDelete::Filter";
}
mutable size_t filtered = 0;
};

Filter filter;
InitIterators(
{test::KeyStr("a", 70, kTypeDeletion), test::KeyStr("a", 50, kTypeValue),
test::KeyStr("c", 70, kTypeDeletion),
test::KeyStr("c", 50, kTypeDeletion)},
{"", "a", "", ""}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
nullptr, &filter);

c_iter_->SeekToFirst();
ASSERT_TRUE(!c_iter_->Valid());
ASSERT_EQ(filter.filtered, 2);
}

// In bottommost level, values earlier than earliest snapshot can be output
// with sequence = 0.
TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
Expand Down
4 changes: 4 additions & 0 deletions file/writable_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
io_options, nullptr);
}

// The buffer initialization code previously in ctor.
if (buf_.Capacity() == 0) {
buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
}
// See whether we need to enlarge the buffer to avoid the flush
if (buf_.Capacity() - buf_.CurrentSize() < left) {
for (size_t cap = buf_.Capacity();
Expand Down
3 changes: 2 additions & 1 deletion file/writable_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ class WritableFileWriter {
TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
reinterpret_cast<void*>(max_buffer_size_));
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
// Moved to `Append` to reduce memory usage of unused writer.
// buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
std::for_each(listeners.begin(), listeners.end(),
[this](const std::shared_ptr<EventListener>& e) {
if (e->ShouldBeNotifiedOnFileIO()) {
Expand Down
26 changes: 23 additions & 3 deletions include/rocksdb/compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class CompactionFilter : public Customizable {
kBlobIndex,
// Wide-column entity
kWideColumnEntity,
kDeletion, // used only by TiKV's region range filter.
};

// Potential decisions that can be returned by the compaction filter's
Expand Down Expand Up @@ -254,9 +255,13 @@ class CompactionFilter : public Customizable {
case ValueType::kBlobIndex:
return Decision::kKeep;

default:
case ValueType::kDeletion:
// Should not appear in this API.
assert(false);
return Decision::kKeep;

default:
return Decision::kKeep;
}
}

Expand Down Expand Up @@ -298,8 +303,23 @@ class CompactionFilter : public Customizable {
return Decision::kKeep;
}

return FilterV2(level, key, value_type, *existing_value, new_value,
skip_until);
return UnsafeFilter(level, key, value_type, *existing_value, new_value,
skip_until);
}

// This interface is reserved for TiKV's region range filter. Only this
// interface can accept `value_type=kTypeDeletion`.
virtual Decision UnsafeFilter(int level, const Slice& key,
ValueType value_type,
const Slice& existing_value,
std::string* new_value,
std::string* skip_until) const {
if (value_type != ValueType::kDeletion) {
return FilterV2(level, key, value_type, existing_value, new_value,
skip_until);
} else {
return Decision::kKeep;
}
}

// Internal (BlobDB) use only. Do not override in application code.
Expand Down

0 comments on commit 891d589

Please sign in to comment.