diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 85d1c039bd3..81e38be352b 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -229,17 +229,20 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, } if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex && - ikey_.type != kTypeWideColumnEntity) { + ikey_.type != kTypeWideColumnEntity && ikey_.type != kTypeDeletion) { return true; } CompactionFilter::Decision decision = CompactionFilter::Decision::kUndetermined; - CompactionFilter::ValueType value_type = - ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue - : ikey_.type == kTypeBlobIndex - ? CompactionFilter::ValueType::kBlobIndex - : CompactionFilter::ValueType::kWideColumnEntity; + CompactionFilter::ValueType value_type = CompactionFilter::ValueType::kValue; + if (ikey_.type == kTypeBlobIndex) { + value_type = CompactionFilter::ValueType::kBlobIndex; + } else if (ikey_.type == kTypeWideColumnEntity) { + value_type = CompactionFilter::ValueType::kWideColumnEntity; + } else if (ikey_.type == kTypeDeletion) { + value_type = CompactionFilter::ValueType::kDeletion; + } // Hack: pass internal key to BlobIndexCompactionFilter since it needs // to get sequence number. diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 699e629693d..7b4e8985024 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -719,6 +719,40 @@ TEST_P(CompactionIteratorTest, SingleMergeOperand) { ASSERT_EQ("cv1cv2", c_iter_->value().ToString()); } +TEST_P(CompactionIteratorTest, RemoveAllSingleDeletes) { + struct Filter : public CompactionFilter { + Decision UnsafeFilter(int /*level*/, const Slice& key, ValueType t, + const Slice& /*existing_value*/, + std::string* /*new_value*/, + std::string* skip_until) const override { + if (t == ValueType::kDeletion) { + *skip_until = key.ToString(); + skip_until->back() += 1; + filtered += 1; + return Decision::kRemoveAndSkipUntil; + } + return Decision::kKeep; + } + + const char* Name() const override { + return "CompactionIteratorTest.SingleDelete::Filter"; + } + mutable size_t filtered = 0; + }; + + Filter filter; + InitIterators( + {test::KeyStr("a", 70, kTypeDeletion), test::KeyStr("a", 50, kTypeValue), + test::KeyStr("c", 70, kTypeDeletion), + test::KeyStr("c", 50, kTypeDeletion)}, + {"", "a", "", ""}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber, + nullptr, &filter); + + c_iter_->SeekToFirst(); + ASSERT_TRUE(!c_iter_->Valid()); + ASSERT_EQ(filter.filtered, 2); +} + // In bottommost level, values earlier than earliest snapshot can be output // with sequence = 0. TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) { diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 908878a5fae..72a66b1bb2c 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -69,6 +69,10 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, io_options, nullptr); } + // The buffer initialization code previously in ctor. + if (buf_.Capacity() == 0) { + buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); + } // See whether we need to enlarge the buffer to avoid the flush if (buf_.Capacity() - buf_.CurrentSize() < left) { for (size_t cap = buf_.Capacity(); diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index aac0f59491e..30c278cfb9a 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -202,7 +202,8 @@ class WritableFileWriter { TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", reinterpret_cast(max_buffer_size_)); buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); - buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); + // Moved to `Append` to reduce memory usage of unused writer. + // buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); std::for_each(listeners.begin(), listeners.end(), [this](const std::shared_ptr& e) { if (e->ShouldBeNotifiedOnFileIO()) { diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 410ee4d3ab0..9592b8e4a7b 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -67,6 +67,7 @@ class CompactionFilter : public Customizable { kBlobIndex, // Wide-column entity kWideColumnEntity, + kDeletion, // used only by TiKV's region range filter. }; // Potential decisions that can be returned by the compaction filter's @@ -254,9 +255,13 @@ class CompactionFilter : public Customizable { case ValueType::kBlobIndex: return Decision::kKeep; - default: + case ValueType::kDeletion: + // Should not appear in this API. assert(false); return Decision::kKeep; + + default: + return Decision::kKeep; } } @@ -298,8 +303,23 @@ class CompactionFilter : public Customizable { return Decision::kKeep; } - return FilterV2(level, key, value_type, *existing_value, new_value, - skip_until); + return UnsafeFilter(level, key, value_type, *existing_value, new_value, + skip_until); + } + + // This interface is reserved for TiKV's region range filter. Only this + // interface can accept `value_type=kTypeDeletion`. + virtual Decision UnsafeFilter(int level, const Slice& key, + ValueType value_type, + const Slice& existing_value, + std::string* new_value, + std::string* skip_until) const { + if (value_type != ValueType::kDeletion) { + return FilterV2(level, key, value_type, existing_value, new_value, + skip_until); + } else { + return Decision::kKeep; + } } // Internal (BlobDB) use only. Do not override in application code.