Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable cf uses separete write buffer manager #343

Merged
merged 42 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
5079707
update
SpadeA-Tang Jul 19, 2023
e3ada2d
update
SpadeA-Tang Jul 19, 2023
fadfef3
update
SpadeA-Tang Jul 19, 2023
6ae46ff
update
SpadeA-Tang Jul 26, 2023
cd5a294
update wbm
SpadeA-Tang Jul 26, 2023
af33283
update
SpadeA-Tang Jul 27, 2023
ab43653
update
SpadeA-Tang Jul 27, 2023
b25567e
update
SpadeA-Tang Jul 27, 2023
033d847
update
SpadeA-Tang Jul 27, 2023
199d704
update
SpadeA-Tang Jul 27, 2023
c47e7d8
can pass original wbm tests
SpadeA-Tang Aug 9, 2023
004be9b
update
SpadeA-Tang Aug 14, 2023
30b7862
add test
SpadeA-Tang Aug 14, 2023
41775e2
format
SpadeA-Tang Aug 14, 2023
0095aa8
fix test
SpadeA-Tang Aug 15, 2023
964cf81
update
SpadeA-Tang Aug 15, 2023
64fbf24
update
SpadeA-Tang Aug 15, 2023
e373067
fix test
SpadeA-Tang Aug 15, 2023
4ce4cef
update
SpadeA-Tang Aug 15, 2023
e67df36
format
SpadeA-Tang Aug 15, 2023
755c10f
update
SpadeA-Tang Aug 16, 2023
79ac364
format
SpadeA-Tang Aug 16, 2023
62cbf09
update
SpadeA-Tang Aug 16, 2023
6f6d326
set write buffer manager in cf option
SpadeA-Tang Aug 17, 2023
6b5aabd
fix test use after free
SpadeA-Tang Aug 18, 2023
d621893
update
SpadeA-Tang Aug 21, 2023
e29df90
update
SpadeA-Tang Aug 21, 2023
37d455c
fix test
SpadeA-Tang Aug 21, 2023
9d24a9f
fix test
SpadeA-Tang Aug 21, 2023
a1ca8a9
fix test
SpadeA-Tang Aug 21, 2023
c38c417
fix test
SpadeA-Tang Aug 21, 2023
9924742
fix test
SpadeA-Tang Aug 21, 2023
8fbaf5c
update comment
SpadeA-Tang Aug 22, 2023
85d43bf
update
SpadeA-Tang Aug 23, 2023
21b9ae5
fix test
SpadeA-Tang Aug 24, 2023
4f509cf
update
SpadeA-Tang Aug 24, 2023
8ecc649
format
SpadeA-Tang Aug 24, 2023
947c94b
update::
SpadeA-Tang Aug 24, 2023
ebb40a1
address comment
SpadeA-Tang Aug 24, 2023
d79241f
address comment
SpadeA-Tang Aug 24, 2023
5e404af
address comment
SpadeA-Tang Aug 24, 2023
35caf14
Merge branch '6.29.tikv' into lock-cf-manager
SpadeA-Tang Aug 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1672,10 +1672,13 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
*db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
db_session_id_);
ColumnFamilyData* new_cfd =
new ColumnFamilyData(id, name, dummy_versions, table_cache_,
options.cf_write_buffer_manager != nullptr
? options.cf_write_buffer_manager.get()
: write_buffer_manager_,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this block out looks better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

options, *db_options_, &file_options_, this,
block_cache_tracer_, io_tracer_, db_session_id_);
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);
Expand Down
10 changes: 8 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,7 @@ class DBImpl : public DB {
virtual Status GetSortedWalFiles(VectorLogPtr& files) override;
virtual Status GetCurrentWalFile(
std::unique_ptr<LogFile>* current_log_file) override;
virtual Status GetCreationTimeOfOldestFile(
uint64_t* creation_time) override;
virtual Status GetCreationTimeOfOldestFile(uint64_t* creation_time) override;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this line, need to commit without running clang-format locally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


virtual Status GetUpdatesSince(
SequenceNumber seq_number, std::unique_ptr<TransactionLogIterator>* iter,
Expand Down Expand Up @@ -2283,6 +2282,13 @@ class DBImpl : public DB {
Directories directories_;

WriteBufferManager* write_buffer_manager_;
// For simplicity, CF based write buffer manager does not support stall the
// write.
// Note: std::shared_ptr<WriteBufferManager> is store in ColumnfamilyOptions
// which is destroyed before DBimpl, so we use
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you had any actual error caused by not using shared_ptr? All code using WriteBufferManager should be destroyed before VersionSet, there shouldn't be any problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, UnregisterDB is used after VersionSet is destroyed.

// std::shared_ptr<WriteBufferManager> here in the vector.
autovector<std::shared_ptr<WriteBufferManager>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment that this is only modified in Open, and lock is not needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

cf_based_write_buffer_manager_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If CreateColumnFamily passed in an option not in the list, return error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


WriteThread write_thread_;
WriteBatch tmp_batch_;
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1737,6 +1737,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
cfh->GetName().c_str());
Status s;
TEST_SYNC_POINT_CALLBACK("DBImpl::Flush:ScheduleFlushReq", column_family);
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
FlushReason::kManualFlush);
Expand Down
46 changes: 38 additions & 8 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,22 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}

DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
for (auto cf : column_families) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of this block of code is quite isolated. I think it's better to wrap a private function for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not, adding a member function has both mental overhead and compilation overhead. Plus this block can't be exactly reused elsewhere.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the major benefit of a private function is better readability: people can skip the detail when they don't care the very detail of the block. Other benefits include better testability, and more modular code.
Of course adding a comment for the block serves the same purpose in terms of readability, but old-school programmers typically prefer a function and even chat-gpt also recommends a wrap function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is always a trade-off to extract a sub-routine. Reading function has a significant context switch overhead, and for non-pure function that modifies states it is mostly needed to read the implementation of the function. In RocksDB codebase it is generally preferred to use inlined code rather than many small functions. Some believe it helps with code quality.

if (cf.options.cf_write_buffer_manager != nullptr) {
bool already_exist = false;
for (auto m : impl->cf_based_write_buffer_manager_) {
if (m == cf.options.cf_write_buffer_manager) {
already_exist = true;
break;
}
}
if (!already_exist) {
impl->cf_based_write_buffer_manager_.push_back(
cf.options.cf_write_buffer_manager);
}
}
}

s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
if (s.ok()) {
std::vector<std::string> paths;
Expand Down Expand Up @@ -1896,20 +1912,34 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
if (s.ok()) {
impl->StartPeriodicWorkScheduler();

// Newly created handles are already registered during
// `CreateColumnFamily`. We must clear them all to avoid duplicate
// registration.
if (impl->write_buffer_manager_) {
// Newly created handles are already registered during
// `CreateColumnFamily`. We must clear them all to avoid duplicate
// registration.
impl->write_buffer_manager_->UnregisterDB(impl);
for (auto* cf : *handles) {
}
for (auto m : impl->cf_based_write_buffer_manager_) {
m->UnregisterDB(impl);
}

for (size_t i = 0; i < (*handles).size(); ++i) {
auto cf_opt = column_families[i].options;

auto* cf = (*handles)[i];
std::string cf_name = cf->GetName();
auto* write_buffer_manager = cf_opt.cf_write_buffer_manager != nullptr
? cf_opt.cf_write_buffer_manager.get()
: impl->write_buffer_manager_;
if (write_buffer_manager) {
if (cf->GetName() == kDefaultColumnFamilyName) {
impl->write_buffer_manager_->RegisterColumnFamily(
impl, impl->default_cf_handle_);
write_buffer_manager->RegisterColumnFamily(impl,
impl->default_cf_handle_);
} else if (cf->GetName() == kPersistentStatsColumnFamilyName) {
impl->write_buffer_manager_->RegisterColumnFamily(
write_buffer_manager->RegisterColumnFamily(
impl, impl->persist_stats_cf_handle_);
} else {
impl->write_buffer_manager_->RegisterColumnFamily(impl, cf);
write_buffer_manager->RegisterColumnFamily(impl, cf);
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,11 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
write_buffer_manager_->MaybeFlush(this);
}
for (auto write_buffer_manager : cf_based_write_buffer_manager_) {
if (UNLIKELY(status.ok() && write_buffer_manager->ShouldFlush())) {
write_buffer_manager->MaybeFlush(this);
}
}

if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) {
InstrumentedMutexLock l(&mutex_);
Expand Down Expand Up @@ -2282,6 +2287,12 @@ size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
bsize = std::min<size_t>(bsize, buffer_size);
}
}
for (auto manager : cf_based_write_buffer_manager_) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be min(db_write_buffer_size, sum(cf_wbm_size) + db_wbm_size).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

size_t buffer_size = manager->flush_size();
if (buffer_size > 0) {
bsize = std::min<size_t>(bsize, buffer_size);
}
}

return bsize;
}
Expand Down
2 changes: 1 addition & 1 deletion db/db_merge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DBMergeTest : public testing::Test {
std::to_string(cf_id), ColumnFamilyOptions(options_)));
}
}
return std::move(column_families);
return column_families;
}

std::string GenDBPath(uint32_t db_id) {
Expand Down
37 changes: 24 additions & 13 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -591,33 +591,39 @@ Options DBTestBase::GetOptions(
return options;
}

void DBTestBase::CreateColumnFamilies(const std::vector<std::string>& cfs,
const Options& options) {
ColumnFamilyOptions cf_opts(options);
void DBTestBase::CreateColumnFamilies(
const std::vector<std::string>& cfs, const Options& options,
std::unordered_map<std::string, std::shared_ptr<WriteBufferManager>> wfms) {
size_t cfi = handles_.size();
handles_.resize(cfi + cfs.size());
for (auto cf : cfs) {
ColumnFamilyOptions cf_opts(options);
if (wfms.find(cf) != wfms.end()) {
cf_opts.cf_write_buffer_manager = wfms[cf];
}
Status s = db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]);
ASSERT_OK(s);
}
}

void DBTestBase::CreateAndReopenWithCF(const std::vector<std::string>& cfs,
const Options& options) {
CreateColumnFamilies(cfs, options);
void DBTestBase::CreateAndReopenWithCF(
const std::vector<std::string>& cfs, const Options& options,
std::unordered_map<std::string, std::shared_ptr<WriteBufferManager>> wfms) {
CreateColumnFamilies(cfs, options, wfms);
std::vector<std::string> cfs_plus_default = cfs;
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
ReopenWithColumnFamilies(cfs_plus_default, options);
ReopenWithColumnFamilies(cfs_plus_default, options, wfms);
}

void DBTestBase::ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const std::vector<Options>& options) {
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
}

void DBTestBase::ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const Options& options) {
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
void DBTestBase::ReopenWithColumnFamilies(
const std::vector<std::string>& cfs, const Options& options,
std::unordered_map<std::string, std::shared_ptr<WriteBufferManager>> wfms) {
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options, wfms));
}

void DBTestBase::SetTimeElapseOnlySleepOnReopen(DBOptions* options) {
Expand Down Expand Up @@ -654,12 +660,16 @@ void DBTestBase::MaybeInstallTimeElapseOnlySleep(const DBOptions& options) {
}

Status DBTestBase::TryReopenWithColumnFamilies(
const std::vector<std::string>& cfs, const std::vector<Options>& options) {
const std::vector<std::string>& cfs, const std::vector<Options>& options,
std::unordered_map<std::string, std::shared_ptr<WriteBufferManager>> wfms) {
Close();
EXPECT_EQ(cfs.size(), options.size());
std::vector<ColumnFamilyDescriptor> column_families;
for (size_t i = 0; i < cfs.size(); ++i) {
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
if (wfms.find(cfs[i]) != wfms.end()) {
column_families.back().options.cf_write_buffer_manager = wfms[cfs[i]];
}
}
DBOptions db_opts = DBOptions(options[0]);
last_options_ = options[0];
Expand All @@ -668,10 +678,11 @@ Status DBTestBase::TryReopenWithColumnFamilies(
}

Status DBTestBase::TryReopenWithColumnFamilies(
const std::vector<std::string>& cfs, const Options& options) {
const std::vector<std::string>& cfs, const Options& options,
std::unordered_map<std::string, std::shared_ptr<WriteBufferManager>> wfms) {
Close();
std::vector<Options> v_opts(cfs.size(), options);
return TryReopenWithColumnFamilies(cfs, v_opts);
return TryReopenWithColumnFamilies(cfs, v_opts, wfms);
}

void DBTestBase::Reopen(const Options& options) {
Expand Down
30 changes: 20 additions & 10 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -967,23 +967,33 @@ class DBTestBase : public testing::Test {

DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_); }

void CreateColumnFamilies(const std::vector<std::string>& cfs,
const Options& options);
void CreateColumnFamilies(
const std::vector<std::string>& cfs, const Options& options,
std::unordered_map<std::string, std::shared_ptr<WriteBufferManager>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the intended use case of this function. If you have different options for each cf, use ReopenWithColumnFamilies(const std::vector<std::string>& cfs, const std::vector<Options>& options); instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

wfms = {});

void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
const Options& options);
void CreateAndReopenWithCF(
const std::vector<std::string>& cfs, const Options& options,
std::unordered_map<std::string, std::shared_ptr<WriteBufferManager>>
wfms = {});

void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const std::vector<Options>& options);

void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const Options& options);
void ReopenWithColumnFamilies(
const std::vector<std::string>& cfs, const Options& options,
std::unordered_map<std::string, std::shared_ptr<WriteBufferManager>>
wfms = {});

Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const std::vector<Options>& options);
Status TryReopenWithColumnFamilies(
const std::vector<std::string>& cfs, const std::vector<Options>& options,
std::unordered_map<std::string, std::shared_ptr<WriteBufferManager>>
wfms = {});

Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const Options& options);
Status TryReopenWithColumnFamilies(
const std::vector<std::string>& cfs, const Options& options,
std::unordered_map<std::string, std::shared_ptr<WriteBufferManager>>
wfms = {});

void Reopen(const Options& options);

Expand Down
Loading
Loading