Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raftstore v2 #389

Merged
merged 18 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ set(SOURCES
db/db_impl/db_impl_experimental.cc
db/db_impl/db_impl_readonly.cc
db/db_impl/db_impl_secondary.cc
db/db_impl/db_impl_merge.cc
db/db_info_dumper.cc
db/db_iter.cc
db/dbformat.cc
Expand Down Expand Up @@ -1327,6 +1328,7 @@ if(WITH_TESTS)
db/db_memtable_test.cc
db/db_merge_operator_test.cc
db/db_merge_operand_test.cc
db/db_merge_test.cc
db/db_options_test.cc
db/db_properties_test.cc
db/db_range_del_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,9 @@ db_merge_operator_test: $(OBJ_DIR)/db/db_merge_operator_test.o $(TEST_LIBRARY) $
db_merge_operand_test: $(OBJ_DIR)/db/db_merge_operand_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_merge_test: $(OBJ_DIR)/db/db_merge_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_options_test: $(OBJ_DIR)/db/db_options_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/db_impl/db_impl_debug.cc",
"db/db_impl/db_impl_experimental.cc",
"db/db_impl/db_impl_files.cc",
"db/db_impl/db_impl_merge.cc",
"db/db_impl/db_impl_open.cc",
"db/db_impl/db_impl_readonly.cc",
"db/db_impl/db_impl_secondary.cc",
Expand Down Expand Up @@ -4862,6 +4863,12 @@ cpp_unittest_wrapper(name="db_merge_operator_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_merge_test",
srcs=["db/db_merge_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_options_test",
srcs=["db/db_options_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
18 changes: 3 additions & 15 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4944,11 +4944,6 @@ bool rocksdb_write_buffer_manager_enabled(rocksdb_write_buffer_manager_t* wbm) {
return wbm->rep->enabled();
}

bool rocksdb_write_buffer_manager_cost_to_cache(
rocksdb_write_buffer_manager_t* wbm) {
return wbm->rep->cost_to_cache();
}

size_t rocksdb_write_buffer_manager_memory_usage(
rocksdb_write_buffer_manager_t* wbm) {
return wbm->rep->memory_usage();
Expand All @@ -4963,17 +4958,10 @@ size_t rocksdb_write_buffer_manager_dummy_entries_in_cache_usage(
rocksdb_write_buffer_manager_t* wbm) {
return wbm->rep->dummy_entries_in_cache_usage();
}
size_t rocksdb_write_buffer_manager_buffer_size(

size_t rocksdb_write_buffer_manager_flush_size(
rocksdb_write_buffer_manager_t* wbm) {
return wbm->rep->buffer_size();
}
void rocksdb_write_buffer_manager_set_buffer_size(
rocksdb_write_buffer_manager_t* wbm, size_t new_size) {
wbm->rep->SetBufferSize(new_size);
}
ROCKSDB_LIBRARY_API void rocksdb_write_buffer_manager_set_allow_stall(
rocksdb_write_buffer_manager_t* wbm, bool new_allow_stall) {
wbm->rep->SetAllowStall(new_allow_stall);
return wbm->rep->flush_size();
}

rocksdb_dbpath_t* rocksdb_dbpath_create(const char* path,
Expand Down
8 changes: 1 addition & 7 deletions db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -3792,14 +3792,8 @@ int main(int argc, char** argv) {

CheckCondition(true ==
rocksdb_write_buffer_manager_enabled(write_buffer_manager));
CheckCondition(true == rocksdb_write_buffer_manager_cost_to_cache(
write_buffer_manager));
CheckCondition(
200 == rocksdb_write_buffer_manager_buffer_size(write_buffer_manager));

rocksdb_write_buffer_manager_set_buffer_size(write_buffer_manager, 300);
CheckCondition(
300 == rocksdb_write_buffer_manager_buffer_size(write_buffer_manager));
200 == rocksdb_write_buffer_manager_flush_size(write_buffer_manager));

rocksdb_write_buffer_manager_destroy(write_buffer_manager);
rocksdb_cache_destroy(lru);
Expand Down
107 changes: 106 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
for (auto& listener : cfd_->ioptions()->listeners) {
listener->OnColumnFamilyHandleDeletionStarted(this);
}
if (cfd_->write_buffer_mgr()) {
cfd_->write_buffer_mgr()->UnregisterColumnFamily(this);
}
// Job id == 0 means that this is not our background process, but rather
// user thread
// Need to hold some shared pointers owned by the initial_cf_options
Expand Down Expand Up @@ -1246,6 +1249,105 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
return status;
}

Status ColumnFamilyData::GetMemtablesUserKeyRange(PinnableSlice* smallest,
PinnableSlice* largest,
bool* found) {
assert(smallest && largest && found);
Status s;
auto* ucmp = user_comparator();
Arena arena;
ReadOptions read_opts;
read_opts.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
merge_iter_builder.AddIterator(mem_->NewIterator(read_opts, &arena));
imm_.current()->AddIterators(read_opts, &merge_iter_builder, false);
ScopedArenaIterator mem_iter(merge_iter_builder.Finish());
mem_iter->SeekToFirst();
if (mem_iter->Valid()) {
auto ukey = mem_iter->user_key();
if (!(*found) || ucmp->Compare(ukey, *smallest) < 0) {
smallest->PinSelf(ukey);
}
mem_iter->SeekToLast();
assert(mem_iter->Valid());
ukey = mem_iter->user_key();
if (!(*found) || ucmp->Compare(*largest, ukey) < 0) {
largest->PinSelf(ukey);
}
*found = true;
}

if (s.ok()) {
autovector<MemTable*> memtables{mem_};
imm_.ExportMemtables(&memtables);
for (auto* mem : memtables) {
auto* iter =
mem->NewRangeTombstoneIterator(read_opts, kMaxSequenceNumber, false);
if (iter != nullptr) {
iter->SeekToFirst();
if (iter->Valid()) {
// It's already a user key.
auto ukey = iter->start_key();
if (!(*found) || ucmp->Compare(ukey, *smallest) < 0) {
smallest->PinSelf(ukey);
}
iter->SeekToLast();
assert(iter->Valid());
// Get the end_key of all tombstones.
ukey = iter->end_key();
if (!(*found) || ucmp->Compare(*largest, ukey) < 0) {
largest->PinSelf(ukey);
}
*found = true;
}
}
}
}

return s;
}

Status ColumnFamilyData::GetUserKeyRange(PinnableSlice* smallest,
PinnableSlice* largest, bool* found) {
assert(smallest && largest && found);
if (ioptions_.compaction_style != CompactionStyle::kCompactionStyleLevel) {
return Status::NotSupported("Unexpected compaction style");
}
Status s = GetMemtablesUserKeyRange(smallest, largest, found);
if (!s.ok()) {
return s;
}

VersionStorageInfo& vsi = *current()->storage_info();
auto* ucmp = user_comparator();
for (const auto& f : vsi.LevelFiles(0)) {
Slice start = f->smallest.user_key();
Slice end = f->largest.user_key();
if (!(*found) || ucmp->Compare(start, *smallest) < 0) {
smallest->PinSelf(start);
}
if (!(*found) || ucmp->Compare(*largest, end) < 0) {
largest->PinSelf(end);
}
*found = true;
}
for (int level = 1; level < vsi.num_levels(); ++level) {
const auto& level_files = vsi.LevelFiles(level);
if (level_files.size() > 0) {
Slice start = level_files.front()->smallest.user_key();
Slice end = level_files.back()->largest.user_key();
if (!(*found) || ucmp->Compare(start, *smallest) < 0) {
smallest->PinSelf(start);
}
if (!(*found) || ucmp->Compare(*largest, end) < 0) {
largest->PinSelf(end);
}
*found = true;
}
}
return s;
}

const int ColumnFamilyData::kCompactAllLevels = -1;
const int ColumnFamilyData::kCompactToBaseLevel = -2;

Expand Down Expand Up @@ -1733,8 +1835,11 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
auto* write_buffer_manager = options.cf_write_buffer_manager != nullptr
? options.cf_write_buffer_manager.get()
: write_buffer_manager_;
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
id, name, dummy_versions, table_cache_, write_buffer_manager, options,
*db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
db_id_, db_session_id_);
column_families_.insert({name, id});
Expand Down
8 changes: 8 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,14 @@ class ColumnFamilyData {
SuperVersion* super_version,
bool allow_data_in_errors, bool* overlap);

// Get user key range of memtables. Tombstones are counted.
Status GetMemtablesUserKeyRange(PinnableSlice* smallest,
PinnableSlice* largest, bool* found);

// Get user key range of all data. Tombstones are counted.
Status GetUserKeyRange(PinnableSlice* smallest, PinnableSlice* largest,
bool* found);

// A flag to tell a manual compaction is to compact all levels together
// instead of a specific level.
static const int kCompactAllLevels;
Expand Down
15 changes: 9 additions & 6 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,20 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
}

if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
ikey_.type != kTypeWideColumnEntity) {
ikey_.type != kTypeWideColumnEntity && ikey_.type != kTypeDeletion) {
return true;
}

CompactionFilter::Decision decision =
CompactionFilter::Decision::kUndetermined;
CompactionFilter::ValueType value_type =
ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
: ikey_.type == kTypeBlobIndex
? CompactionFilter::ValueType::kBlobIndex
: CompactionFilter::ValueType::kWideColumnEntity;
CompactionFilter::ValueType value_type = CompactionFilter::ValueType::kValue;
if (ikey_.type == kTypeBlobIndex) {
value_type = CompactionFilter::ValueType::kBlobIndex;
} else if (ikey_.type == kTypeWideColumnEntity) {
value_type = CompactionFilter::ValueType::kWideColumnEntity;
} else if (ikey_.type == kTypeDeletion) {
value_type = CompactionFilter::ValueType::kDeletion;
}

// Hack: pass internal key to BlobIndexCompactionFilter since it needs
// to get sequence number.
Expand Down
34 changes: 34 additions & 0 deletions db/compaction/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,40 @@ TEST_P(CompactionIteratorTest, SingleMergeOperand) {
ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
}

TEST_P(CompactionIteratorTest, RemoveAllSingleDeletes) {
struct Filter : public CompactionFilter {
Decision UnsafeFilter(int /*level*/, const Slice& key, ValueType t,
const Slice& /*existing_value*/,
std::string* /*new_value*/,
std::string* skip_until) const override {
if (t == ValueType::kDeletion) {
*skip_until = key.ToString();
skip_until->back() += 1;
filtered += 1;
return Decision::kRemoveAndSkipUntil;
}
return Decision::kKeep;
}

const char* Name() const override {
return "CompactionIteratorTest.SingleDelete::Filter";
}
mutable size_t filtered = 0;
};

Filter filter;
InitIterators(
{test::KeyStr("a", 70, kTypeDeletion), test::KeyStr("a", 50, kTypeValue),
test::KeyStr("c", 70, kTypeDeletion),
test::KeyStr("c", 50, kTypeDeletion)},
{"", "a", "", ""}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
nullptr, &filter);

c_iter_->SeekToFirst();
ASSERT_TRUE(!c_iter_->Valid());
ASSERT_EQ(filter.filtered, 2);
}

// In bottommost level, values earlier than earliest snapshot can be output
// with sequence = 0.
TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
Expand Down
6 changes: 4 additions & 2 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// (found in the LICENSE.Apache file in the root directory).
//


#include <algorithm>
#include <cstdint>
#include <memory>
Expand All @@ -29,7 +28,10 @@
namespace ROCKSDB_NAMESPACE {

Status DBImpl::FlushForGetLiveFiles() {
return DBImpl::FlushAllColumnFamilies(FlushOptions(),
FlushOptions flush_options;
flush_options.allow_write_stall = true;
flush_options.check_if_compaction_disabled = true;
return DBImpl::FlushAllColumnFamilies(flush_options,
FlushReason::kGetLiveFiles);
}

Expand Down
32 changes: 31 additions & 1 deletion db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -740,9 +740,11 @@ class TestFlushListener : public EventListener {
DBFlushTest* test_;
};

// Disabled, because of
// https://github.com/tikv/rocksdb/pull/389/commits/cc433939ed937a82d0a0ccad1280d5907b048654
TEST_F(
DBFlushTest,
FixUnrecoverableWriteDuringAtomicFlushWaitUntilFlushWouldNotStallWrites) {
DISABLED_FixUnrecoverableWriteDuringAtomicFlushWaitUntilFlushWouldNotStallWrites) {
Options options = CurrentOptions();
options.atomic_flush = true;

Expand Down Expand Up @@ -2012,6 +2014,13 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
}
}

void OnFlushBegin(DB* /*db*/, const FlushJobInfo& info) override {
ASSERT_LE(info.smallest_seqno, info.largest_seqno);
if (info.largest_seqno != seq1) {
ASSERT_EQ(info.largest_seqno, seq2);
}
}

void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
InstrumentedMutex* mutex = db_impl->mutex();
Expand Down Expand Up @@ -3189,6 +3198,27 @@ TEST_P(DBAtomicFlushTest, NoWaitWhenWritesStopped) {
SyncPoint::GetInstance()->DisableProcessing();
}

TEST_P(DBAtomicFlushTest, DisableManualCompaction) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ASSERT_OK(dbfull()->PauseBackgroundWork());
ASSERT_OK(Put(0, "key00", "value00"));
ASSERT_OK(Put(1, "key10", "value10"));
dbfull()->DisableManualCompaction();
FlushOptions flush_opts;
flush_opts.wait = true;
flush_opts.check_if_compaction_disabled = true;
ASSERT_TRUE(dbfull()->Flush(flush_opts, handles_).IsIncomplete());
ASSERT_OK(Put(0, "key01", "value01"));
ASSERT_OK(db_->ContinueBackgroundWork());
dbfull()->EnableManualCompaction();
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
Close();
}

INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());

Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/compacted_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ class CompactedDBImpl : public DBImpl {
const Slice& /*key*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
virtual Status Write(const WriteOptions& /*options*/,
WriteBatch* /*updates*/) override {
using DBImpl::Write;
virtual Status Write(const WriteOptions& /*options*/, WriteBatch* /*updates*/,
PostWriteCallback* /*callback*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
using DBImpl::CompactRange;
Expand Down
Loading
Loading