From 63db33c9d92c3956670f955f1618d5c966ec60fd Mon Sep 17 00:00:00 2001 From: vegetableysm Date: Fri, 16 Aug 2024 16:16:31 +0800 Subject: [PATCH] Clean code. Remove AllocateMemoryWithSpill from io_adapter::Read. Signed-off-by: vegetableysm --- src/server/memory/usage.h | 68 ++++++++++++++++++++--------------- src/server/util/spill_file.cc | 18 ---------- src/server/util/spill_file.h | 1 - 3 files changed, 40 insertions(+), 47 deletions(-) diff --git a/src/server/memory/usage.h b/src/server/memory/usage.h index e36e7c408..d5a45398d 100644 --- a/src/server/memory/usage.h +++ b/src/server/memory/usage.h @@ -211,14 +211,14 @@ class ColdObjectTracker std::lock_guard locked(mu_); std::cout << "Thread " << std::this_thread::get_id() << " in Ref, RefPayload:" << id << std::endl; - auto it = map_.find(id); - if (it == map_.end()) { - list_.emplace_front(id, payload); - map_.emplace(id, list_.begin()); + auto it = ref_list_iter_map_.find(id); + if (it == ref_list_iter_map_.end()) { + ref_list_.emplace_front(id, payload); + ref_list_iter_map_.emplace(id, ref_list_.begin()); } else { - list_.erase(it->second); - list_.emplace_front(id, payload); - it->second = list_.begin(); + ref_list_.erase(it->second); + ref_list_.emplace_front(id, payload); + it->second = ref_list_.begin(); } } @@ -239,8 +239,8 @@ class ColdObjectTracker Status Unref(const ID id, const bool fast_delete, const std::shared_ptr& bulk_store) { std::lock_guard locked(mu_); - auto it = map_.find(id); - if (it == map_.end()) { + auto it = ref_list_iter_map_.find(id); + if (it == ref_list_iter_map_.end()) { auto spilled = spilled_obj_.find(id); if (spilled == spilled_obj_.end()) { return Status::OK(); @@ -249,6 +249,10 @@ class ColdObjectTracker // NB: explicitly copy the std::shared_ptr as the iterator is not // stable. auto payload = spilled->second; + payload->pointer = bulk_store->AllocateMemoryWithSpill(payload->data_size, + &payload->store_fd, + &payload->map_size, + &payload->data_offset); RETURN_ON_ERROR(bulk_store->ReloadPayload(id, payload)); } else { RETURN_ON_ERROR(bulk_store->DeletePayloadFile(id)); @@ -256,8 +260,8 @@ class ColdObjectTracker spilled_obj_.erase(spilled); return Status::OK(); } else { - list_.erase(it->second); - map_.erase(it); + ref_list_.erase(it->second); + ref_list_iter_map_.erase(it); return Status::OK(); } } @@ -266,11 +270,11 @@ class ColdObjectTracker std::lock_guard locked(mu_); size_t spilled_sz = 0; auto status = Status::OK(); - auto it = list_.rbegin(); + auto it = ref_list_.rbegin(); std::map> pinned_objects; std::cout << "Thread " << std::this_thread::get_id() - << " in SpillFor, list_ size:" << list_.size() << std::endl; - while (it != list_.rend()) { + << " in SpillFor, ref_list_ size:" << ref_list_.size() << std::endl; + while (it != ref_list_.rend()) { if (it->second->IsPinned()) { // bypass pinned pinned_objects.emplace(it->first, it->second); @@ -285,12 +289,12 @@ class ColdObjectTracker std::cout << "Thread " << std::this_thread::get_id() << ", SpillPayload :" << it->first << " success" << std::endl; spilled_sz += it->second->data_size; - map_.erase(it->first); + ref_list_iter_map_.erase(it->first); } else if (s.IsObjectSpilled()) { std::cout << "Thread " << std::this_thread::get_id() << ", SpillPayload :" << it->first << " already spilled" << std::endl; - map_.erase(it->first); + ref_list_iter_map_.erase(it->first); } else { status += s; break; @@ -300,9 +304,9 @@ class ColdObjectTracker break; } } - auto popped_size = std::distance(list_.rbegin(), it); + auto popped_size = std::distance(ref_list_.rbegin(), it); while (popped_size-- > 0) { - list_.pop_back(); + ref_list_.pop_back(); } // restore pinned objects for (auto const& item : pinned_objects) { @@ -393,8 +397,8 @@ class ColdObjectTracker mutable std::recursive_mutex mu_; // protected by mu_ - lru_map_t map_; - lru_list_t list_; + lru_map_t ref_list_iter_map_; + lru_list_t ref_list_; ska::flat_hash_map> spilled_obj_; friend class ColdObjectTracker; }; @@ -526,11 +530,23 @@ class ColdObjectTracker Status ReloadColdObject(const ObjectID& object_id, const std::shared_ptr& payload, const bool pin) { - if (spill_path_.empty()) { + if (spill_path_.empty() || payload->data_size == 0) { return Status::OK(); // bypass, as spill is not enabled } - return cold_obj_lru_.ReloadObject(object_id, payload, pin, + payload->pointer = AllocateMemoryWithSpill(payload->data_size, &payload->store_fd, + &payload->map_size, &payload->data_offset); + if (payload->pointer == nullptr) { + return Status::NotEnoughMemory("Failed to allocate memory of size " + + std::to_string(payload->data_size) + + " while reload spilling file"); + } + Status status = cold_obj_lru_.ReloadObject(object_id, payload, pin, shared_from_self()); + if (!status.ok()) { + BulkAllocator::Free(payload->pointer, payload->data_size); + payload->pointer = nullptr; + } + return status; } /** @@ -557,8 +573,7 @@ class ColdObjectTracker */ uint8_t* AllocateMemoryWithSpill(const size_t size, int* fd, int64_t* map_size, ptrdiff_t* offset) { - // std::lock_guard locked(allocate_memory_mu_); - std::lock_guard locked(cold_obj_lru_.mu_); + // std::lock_guard locked(cold_obj_lru_.mu_); uint8_t* pointer = nullptr; std::cout << "Thread " << std::this_thread::get_id() << " before AllocateMemoryWithSpill;" << "size:" << size @@ -576,8 +591,7 @@ class ColdObjectTracker // 2. memory usage is above upper bound if (pointer == nullptr || BulkAllocator::Allocated() >= self().mem_spill_upper_bound_) { - // std::unique_lock locked(spill_mu_); - + std::lock_guard locked(spill_mu_); int64_t min_spill_size = 0; if (pointer == nullptr) { std::cout << "Thread " << std::this_thread::get_id() @@ -688,8 +702,6 @@ class ColdObjectTracker lru_t cold_obj_lru_; std::string spill_path_; std::mutex spill_mu_; - std::mutex allocate_memory_mu_; - std::shared_mutex global_file_mu_; }; } // namespace detail diff --git a/src/server/util/spill_file.cc b/src/server/util/spill_file.cc index e0a957ff3..b29e5a0c9 100644 --- a/src/server/util/spill_file.cc +++ b/src/server/util/spill_file.cc @@ -39,8 +39,6 @@ Status SpillFileWriter::Init(const ObjectID object_id) { } Status SpillFileWriter::Write(const std::shared_ptr& payload) { - //FileLocker::getInstance().lockForWrite(payload->object_id); - //std::lock_guard lock(io_mutex_); RETURN_ON_ERROR(Init(payload->object_id)); if (io_adaptor_ == nullptr) { return Status::IOError("Can't open io_adaptor"); @@ -57,12 +55,10 @@ Status SpillFileWriter::Write(const std::shared_ptr& payload) { std::cout << "Thread " << std::this_thread::get_id() << " writing content: " << payload->pointer << std::endl; auto status = io_adaptor_->Write(reinterpret_cast(payload->pointer), payload->data_size); - //FileLocker::getInstance().unlockForWrite(payload->object_id); return status; } Status SpillFileWriter::Sync() { - //std::lock_guard lock(io_mutex_); RETURN_ON_ERROR(io_adaptor_->Flush()); io_adaptor_ = nullptr; return Status::OK(); @@ -80,8 +76,6 @@ Status SpillFileReader::Init(const ObjectID object_id) { Status SpillFileReader::Read(const std::shared_ptr& payload, const std::shared_ptr& bulk_store) { - //FileLocker::getInstance().lockForRead(payload->object_id); - //std::lock_guard lock(io_mutex_); RETURN_ON_ERROR(Init(payload->object_id)); // reload 1. object_id 2. data_size 3. content if (io_adaptor_ == nullptr) { @@ -108,31 +102,19 @@ Status SpillFileReader::Read(const std::shared_ptr& payload, ObjectIDToString(payload->object_id)); } } - payload->pointer = bulk_store->AllocateMemoryWithSpill( - payload->data_size, &(payload->store_fd), &(payload->map_size), - &(payload->data_offset)); - if (payload->pointer == nullptr) { - return Status::NotEnoughMemory("Failed to allocate memory of size " + - std::to_string(payload->data_size) + - " while reload spilling file"); - } std::cout << "Thread " << std::this_thread::get_id() << " reading content: " << payload->pointer << std::endl; RETURN_ON_ERROR(io_adaptor_->Read(payload->pointer, payload->data_size)); std::cout << "Thread " << std::this_thread::get_id() << " is deleting object_id: " << payload->object_id << std::endl; RETURN_ON_ERROR(Delete_(payload->object_id)); io_adaptor_ = nullptr; - //FileLocker::getInstance().unlockForRead(payload->object_id); return Status::OK(); } Status SpillFileReader::Delete_(const ObjectID id) { - //FileLocker::getInstance().lockForRead(id); - //std::lock_guard lock(io_mutex_); if (!io_adaptor_) { return Status::Invalid("I/O adaptor is not initialized"); } RETURN_ON_ERROR(io_adaptor_->RemoveFile(spill_path_ + std::to_string(id))); - //FileLocker::getInstance().unlockForRead(id); return Status::OK(); } diff --git a/src/server/util/spill_file.h b/src/server/util/spill_file.h index bac3ca5b5..b011051a4 100644 --- a/src/server/util/spill_file.h +++ b/src/server/util/spill_file.h @@ -121,7 +121,6 @@ class SpillFileReader { Status Delete_(const ObjectID id); std::string spill_path_; - std::mutex io_mutex_; std::unique_ptr io_adaptor_ = nullptr; };