diff --git a/db/column_family.cc b/db/column_family.cc index 3ea45074ea9..bb9cb87796f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1835,8 +1835,11 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( const std::string& name, uint32_t id, Version* dummy_versions, const ColumnFamilyOptions& options) { assert(column_families_.find(name) == column_families_.end()); + auto* write_buffer_manager = options.cf_write_buffer_manager != nullptr + ? options.cf_write_buffer_manager.get() + : write_buffer_manager_; ColumnFamilyData* new_cfd = new ColumnFamilyData( - id, name, dummy_versions, table_cache_, write_buffer_manager_, options, + id, name, dummy_versions, table_cache_, write_buffer_manager, options, *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_, db_id_, db_session_id_); column_families_.insert({name, id}); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 2e1fac618ee..5dc990fd024 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -659,6 +659,11 @@ Status DBImpl::CloseHelper() { delete txn_entry.second; } + // We can only access cf_based_write_buffer_manager_ before versions_.reset(), + // after which all cf write buffer managers will be freed. + for (auto m : cf_based_write_buffer_manager_) { + m->UnregisterDB(this); + } // versions need to be destroyed before table_cache since it can hold // references to table_cache. versions_.reset(); @@ -3654,7 +3659,20 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, if (s.ok()) { NewThreadStatusCfInfo( static_cast_with_check(*handle)->cfd()); - if (write_buffer_manager_ != nullptr) { + if (cf_options.cf_write_buffer_manager != nullptr) { + auto* write_buffer_manager = cf_options.cf_write_buffer_manager.get(); + bool exist = false; + for (auto m : cf_based_write_buffer_manager_) { + if (m == write_buffer_manager) { + exist = true; + } + } + if (!exist) { + return Status::NotSupported( + "New cf write buffer manager is not supported after Open"); + } + write_buffer_manager->RegisterColumnFamily(this, *handle); + } else if (write_buffer_manager_ != nullptr) { write_buffer_manager_->RegisterColumnFamily(this, *handle); } } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 548fed1ed08..19d413e5321 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2596,6 +2596,10 @@ class DBImpl : public DB { Directories directories_; WriteBufferManager* write_buffer_manager_; + // For simplicity, CF based write buffer manager does not support stall the + // write. + // Note: It's only modifed in Open, so mutex is not needed. + autovector cf_based_write_buffer_manager_; WriteThread write_thread_; WriteBatch tmp_batch_; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index c8209cd4de9..c2bd7af0476 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1961,6 +1961,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options, ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.", cfh->GetName().c_str()); Status s; + TEST_SYNC_POINT_CALLBACK("DBImpl::Flush:ScheduleFlushReq", column_family); if (immutable_db_options_.atomic_flush) { s = AtomicFlushMemTables(flush_options, FlushReason::kManualFlush, {cfh->cfd()}); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index a6ba07bd329..d41cd5b3a70 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1969,6 +1969,22 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn); + for (auto cf : column_families) { + if (cf.options.cf_write_buffer_manager != nullptr) { + auto* write_buffer_manager = cf.options.cf_write_buffer_manager.get(); + bool already_exist = false; + for (auto m : impl->cf_based_write_buffer_manager_) { + if (m == write_buffer_manager) { + already_exist = true; + break; + } + } + if (!already_exist) { + impl->cf_based_write_buffer_manager_.push_back(write_buffer_manager); + } + } + } + if (!impl->immutable_db_options_.info_log) { s = impl->init_logger_creation_s_; delete impl; @@ -2266,19 +2282,29 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { s = impl->StartPeriodicTaskScheduler(); if (impl->write_buffer_manager_) { - // Newly created handles are already registered during - // `CreateColumnFamily`. We must clear them all to avoid duplicate - // registration. impl->write_buffer_manager_->UnregisterDB(impl); - for (auto* cf : *handles) { + } + for (auto m : impl->cf_based_write_buffer_manager_) { + m->UnregisterDB(impl); + } + + for (size_t i = 0; i < (*handles).size(); ++i) { + auto cf_opt = column_families[i].options; + + auto* cf = (*handles)[i]; + std::string cf_name = cf->GetName(); + auto* write_buffer_manager = cf_opt.cf_write_buffer_manager != nullptr + ? cf_opt.cf_write_buffer_manager.get() + : impl->write_buffer_manager_; + if (write_buffer_manager) { if (cf->GetName() == kDefaultColumnFamilyName) { - impl->write_buffer_manager_->RegisterColumnFamily( - impl, impl->default_cf_handle_); + write_buffer_manager->RegisterColumnFamily(impl, + impl->default_cf_handle_); } else if (cf->GetName() == kPersistentStatsColumnFamilyName) { - impl->write_buffer_manager_->RegisterColumnFamily( + write_buffer_manager->RegisterColumnFamily( impl, impl->persist_stats_cf_handle_); } else { - impl->write_buffer_manager_->RegisterColumnFamily(impl, cf); + write_buffer_manager->RegisterColumnFamily(impl, cf); } } } diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index f3c26279764..7c174c0e73b 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1456,6 +1456,11 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) { write_buffer_manager_->MaybeFlush(this); } + for (auto write_buffer_manager : cf_based_write_buffer_manager_) { + if (UNLIKELY(status.ok() && write_buffer_manager->ShouldFlush())) { + write_buffer_manager->MaybeFlush(this); + } + } if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) { InstrumentedMutexLock l(&mutex_); @@ -2516,6 +2521,9 @@ size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const { if (immutable_db_options_.write_buffer_manager) { size_t buffer_size = immutable_db_options_.write_buffer_manager->flush_size(); + for (auto manager : cf_based_write_buffer_manager_) { + buffer_size += manager->flush_size(); + } if (buffer_size > 0) { bsize = std::min(bsize, buffer_size); } diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 28d67527fe9..6d46be7ea1a 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -617,6 +617,22 @@ void DBTestBase::ReopenWithColumnFamilies(const std::vector& cfs, ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); } +void DBTestBase::OpenWithCFWriteBufferManager( + const std::vector& cfs, + const std::vector> wbms, + const Options& options) { + CreateColumnFamilies(cfs, options); + std::vector cfs_plus_default = cfs; + cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName); + std::vector cf_options; + for (size_t i = 0; i < wbms.size(); ++i) { + auto o = options; + o.cf_write_buffer_manager = wbms[i]; + cf_options.push_back(o); + } + ReopenWithColumnFamilies(cfs_plus_default, cf_options); +} + void DBTestBase::SetTimeElapseOnlySleepOnReopen(DBOptions* options) { time_elapse_only_sleep_on_reopen_ = true; diff --git a/db/db_test_util.h b/db/db_test_util.h index dc34352dc2e..f1298dc6bf5 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -1118,6 +1118,11 @@ class DBTestBase : public testing::Test { Status TryReopenWithColumnFamilies(const std::vector& cfs, const Options& options); + void OpenWithCFWriteBufferManager( + const std::vector& cfs, + const std::vector> wbms, + const Options& options); + void Reopen(const Options& options); void Close(); diff --git a/db/db_write_buffer_manager_test.cc b/db/db_write_buffer_manager_test.cc index 41db8b4ea1e..28dc9908c6a 100644 --- a/db/db_write_buffer_manager_test.cc +++ b/db/db_write_buffer_manager_test.cc @@ -179,6 +179,237 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +// Compared with `SharedWriteBufferAcrossCFs2` this test uses CF based write +// buffer manager CF level write buffer manager will not block write even +// exceeds the stall threshold DB level write buffer manager will block all +// write including CFs not use it. +TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs3) { + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + std::shared_ptr cf_write_buffer_manager; + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, 1.0)); + cf_write_buffer_manager.reset(new WriteBufferManager(100000, cache, 1.0)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, 1.0)); + cf_write_buffer_manager.reset(new WriteBufferManager(100000, nullptr, 1.0)); + } + + WriteOptions wo; + wo.disableWAL = true; + + std::vector cfs = {"cf1", "cf2", "cf3", "cf4", "cf5"}; + std::vector> wbms = { + nullptr, + nullptr, + nullptr, + nullptr, + cf_write_buffer_manager, + cf_write_buffer_manager}; + OpenWithCFWriteBufferManager(cfs, wbms, options); + auto opts = db_->GetOptions(); + + ASSERT_OK(Put(4, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(5, Key(1), DummyString(40000), wo)); + ASSERT_OK(Put(4, Key(1), DummyString(40000), wo)); + // Now, cf_write_buffer_manager reaches the stall level, but it will not block + // the write + + int num_writers_total = 6; + for (int i = 0; i < num_writers_total; i++) { + ASSERT_OK(Put(i, Key(1), DummyString(1), wo)); + } + + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + Flush(3); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + Flush(0); + + // Write to "Default", "cf2" and "cf3". No flush will be triggered. + ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(40000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + + ASSERT_OK(Put(3, Key(2), DummyString(40000), wo)); + // WriteBufferManager::buffer_size_ has exceeded after the previous write is + // completed. + + std::unordered_set w_set; + std::vector threads; + int wait_count_db = 0; + int num_writers1 = 4; // default, cf1-cf3 + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + std::atomic thread_num(0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WBMStallInterface::BlockDB", [&](void*) { + InstrumentedMutexLock lock(&mutex); + wait_count_db++; + cv.SignalAll(); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::WriteStall::Wait", [&](void* arg) { + InstrumentedMutexLock lock(&mutex); + WriteThread::Writer* w = reinterpret_cast(arg); + w_set.insert(w); + // Allow the flush to continue if all writer threads are blocked. + if (w_set.size() == (unsigned long)num_writers1) { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0"); + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s = true; + + std::function writer = [&](int cf) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + Status tmp = Put(cf, Slice(key), DummyString(1), wo); + InstrumentedMutexLock lock(&mutex); + s = s && tmp.ok(); + }; + + threads.emplace_back(writer, 1); + { + InstrumentedMutexLock lock(&mutex); + while (wait_count_db != 1) { + cv.Wait(); + } + } + for (int i = 0; i < num_writers_total; i++) { + threads.emplace_back(writer, i % 6); + } + for (auto& t : threads) { + t.join(); + } + + ASSERT_TRUE(s); + + // Number of DBs blocked. + ASSERT_EQ(wait_count_db, 1); + // Number of Writer threads blocked. + ASSERT_EQ(w_set.size(), num_writers_total); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Test multiple WriteBufferManager are independent to flush +TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs4) { + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + std::shared_ptr cf_write_buffer_manager; + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, 0.0)); + cf_write_buffer_manager.reset(new WriteBufferManager(100000, cache, 0.0)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, 0.0)); + cf_write_buffer_manager.reset(new WriteBufferManager(100000, nullptr, 0.0)); + } + + WriteOptions wo; + wo.disableWAL = true; + + std::vector cfs = {"cf1", "cf2", "cf3", "cf4", "cf5"}; + std::vector> wbms = { + nullptr, + nullptr, + nullptr, + nullptr, + cf_write_buffer_manager, + cf_write_buffer_manager}; + OpenWithCFWriteBufferManager(cfs, wbms, options); + + ASSERT_OK(Put(4, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(4, Key(1), DummyString(40000), wo)); + + ASSERT_OK(Put(1, Key(1), DummyString(40000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(30000), wo)); + + ASSERT_OK(Put(5, Key(1), DummyString(50000), wo)); + + // The second WriteBufferManager::buffer_size_ has exceeded after the previous + // write is completed. + + std::unordered_set flush_cfs; + std::vector threads; + int num_writers_total = 6; + InstrumentedMutex mutex; + InstrumentedCondVar cv(&mutex); + std::atomic thread_num(0); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0", + "DBImpl::BackgroundCallFlush:start"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::Flush:ScheduleFlushReq", [&](void* arg) { + InstrumentedMutexLock lock(&mutex); + ColumnFamilyHandle* cfd = reinterpret_cast(arg); + flush_cfs.insert(cfd->GetName()); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + bool s = true; + + std::function writer = [&](int cf, int val_size) { + int a = thread_num.fetch_add(1); + std::string key = "foo" + std::to_string(a); + Status tmp = Put(cf, Slice(key), DummyString(val_size), wo); + InstrumentedMutexLock lock(&mutex); + s = s && tmp.ok(); + }; + + for (int i = 0; i < num_writers_total; i++) { + threads.emplace_back(writer, i % 6, 1); + } + for (auto& t : threads) { + t.join(); + } + threads.clear(); + + ASSERT_TRUE(s); + ASSERT_EQ(flush_cfs.size(), 1); + ASSERT_NE(flush_cfs.find("cf4"), flush_cfs.end()); + flush_cfs.clear(); + + ASSERT_OK(Put(0, Key(1), DummyString(30000), wo)); + + for (int i = 0; i < num_writers_total; i++) { + threads.emplace_back(writer, i % 6, 1); + } + for (auto& t : threads) { + t.join(); + } + + ASSERT_EQ(flush_cfs.size(), 1); + ASSERT_NE(flush_cfs.find("cf1"), flush_cfs.end()); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_P(DBWriteBufferManagerTest, FreeMemoryOnDestroy) { Options options = CurrentOptions(); options.arena_block_size = 4096; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index fa1d4296b1b..fffbe8bd1ba 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -349,6 +349,13 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions { // Dynamically changeable through SetOptions() API uint32_t memtable_max_range_deletions = 0; + // Column family based write buffer manager, if this is set, this column + // facmily will not report memtable memory usage to the write buffer manager + // in DBImpl. + // + // Default: null + std::shared_ptr cf_write_buffer_manager = nullptr; + // Create ColumnFamilyOptions with default values for all fields ColumnFamilyOptions(); // Create ColumnFamilyOptions from Options diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index ced8597a9d6..17fcec521cc 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -438,6 +438,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { sizeof(std::shared_ptr)}, {offsetof(struct ColumnFamilyOptions, sst_partitioner_factory), sizeof(std::shared_ptr)}, + {offset_of(&ColumnFamilyOptions::cf_write_buffer_manager), + sizeof(std::shared_ptr)}, }; char* options_ptr = new char[sizeof(ColumnFamilyOptions)];