Skip to content

Commit

Permalink
Clean code.
Browse files Browse the repository at this point in the history
Remove AllocateMemoryWithSpill from io_adapter::Read.

Signed-off-by: vegetableysm <[email protected]>
  • Loading branch information
vegetableysm committed Aug 16, 2024
1 parent 40e32f5 commit 63db33c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 47 deletions.
68 changes: 40 additions & 28 deletions src/server/memory/usage.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ class ColdObjectTracker
std::lock_guard<decltype(mu_)> locked(mu_);
std::cout << "Thread " << std::this_thread::get_id()
<< " in Ref, RefPayload:" << id << std::endl;
auto it = map_.find(id);
if (it == map_.end()) {
list_.emplace_front(id, payload);
map_.emplace(id, list_.begin());
auto it = ref_list_iter_map_.find(id);
if (it == ref_list_iter_map_.end()) {
ref_list_.emplace_front(id, payload);
ref_list_iter_map_.emplace(id, ref_list_.begin());
} else {
list_.erase(it->second);
list_.emplace_front(id, payload);
it->second = list_.begin();
ref_list_.erase(it->second);
ref_list_.emplace_front(id, payload);
it->second = ref_list_.begin();
}
}

Expand All @@ -239,8 +239,8 @@ class ColdObjectTracker
Status Unref(const ID id, const bool fast_delete,
const std::shared_ptr<Der>& bulk_store) {
std::lock_guard<decltype(mu_)> locked(mu_);
auto it = map_.find(id);
if (it == map_.end()) {
auto it = ref_list_iter_map_.find(id);
if (it == ref_list_iter_map_.end()) {
auto spilled = spilled_obj_.find(id);
if (spilled == spilled_obj_.end()) {
return Status::OK();
Expand All @@ -249,15 +249,19 @@ class ColdObjectTracker
// NB: explicitly copy the std::shared_ptr as the iterator is not
// stable.
auto payload = spilled->second;
payload->pointer = bulk_store->AllocateMemoryWithSpill(payload->data_size,
&payload->store_fd,
&payload->map_size,
&payload->data_offset);
RETURN_ON_ERROR(bulk_store->ReloadPayload(id, payload));
} else {
RETURN_ON_ERROR(bulk_store->DeletePayloadFile(id));
}
spilled_obj_.erase(spilled);
return Status::OK();
} else {
list_.erase(it->second);
map_.erase(it);
ref_list_.erase(it->second);
ref_list_iter_map_.erase(it);
return Status::OK();
}
}
Expand All @@ -266,11 +270,11 @@ class ColdObjectTracker
std::lock_guard<decltype(mu_)> locked(mu_);
size_t spilled_sz = 0;
auto status = Status::OK();
auto it = list_.rbegin();
auto it = ref_list_.rbegin();
std::map<ObjectID, std::shared_ptr<Payload>> pinned_objects;
std::cout << "Thread " << std::this_thread::get_id()
<< " in SpillFor, list_ size:" << list_.size() << std::endl;
while (it != list_.rend()) {
<< " in SpillFor, ref_list_ size:" << ref_list_.size() << std::endl;
while (it != ref_list_.rend()) {
if (it->second->IsPinned()) {
// bypass pinned
pinned_objects.emplace(it->first, it->second);
Expand All @@ -285,12 +289,12 @@ class ColdObjectTracker
std::cout << "Thread " << std::this_thread::get_id()
<< ", SpillPayload :" << it->first << " success" << std::endl;
spilled_sz += it->second->data_size;
map_.erase(it->first);
ref_list_iter_map_.erase(it->first);
} else if (s.IsObjectSpilled()) {
std::cout << "Thread " << std::this_thread::get_id()
<< ", SpillPayload :" << it->first << " already spilled"
<< std::endl;
map_.erase(it->first);
ref_list_iter_map_.erase(it->first);
} else {
status += s;
break;
Expand All @@ -300,9 +304,9 @@ class ColdObjectTracker
break;
}
}
auto popped_size = std::distance(list_.rbegin(), it);
auto popped_size = std::distance(ref_list_.rbegin(), it);
while (popped_size-- > 0) {
list_.pop_back();
ref_list_.pop_back();
}
// restore pinned objects
for (auto const& item : pinned_objects) {
Expand Down Expand Up @@ -393,8 +397,8 @@ class ColdObjectTracker

mutable std::recursive_mutex mu_;
// protected by mu_
lru_map_t map_;
lru_list_t list_;
lru_map_t ref_list_iter_map_;
lru_list_t ref_list_;
ska::flat_hash_map<ID, std::shared_ptr<P>> spilled_obj_;
friend class ColdObjectTracker;
};
Expand Down Expand Up @@ -526,11 +530,23 @@ class ColdObjectTracker
Status ReloadColdObject(const ObjectID& object_id,
const std::shared_ptr<Payload>& payload,
const bool pin) {
if (spill_path_.empty()) {
if (spill_path_.empty() || payload->data_size == 0) {
return Status::OK(); // bypass, as spill is not enabled
}
return cold_obj_lru_.ReloadObject(object_id, payload, pin,
payload->pointer = AllocateMemoryWithSpill(payload->data_size, &payload->store_fd,
&payload->map_size, &payload->data_offset);
if (payload->pointer == nullptr) {
return Status::NotEnoughMemory("Failed to allocate memory of size " +
std::to_string(payload->data_size) +
" while reload spilling file");
}
Status status = cold_obj_lru_.ReloadObject(object_id, payload, pin,
shared_from_self());
if (!status.ok()) {
BulkAllocator::Free(payload->pointer, payload->data_size);
payload->pointer = nullptr;
}
return status;
}

/**
Expand All @@ -557,8 +573,7 @@ class ColdObjectTracker
*/
uint8_t* AllocateMemoryWithSpill(const size_t size, int* fd,
int64_t* map_size, ptrdiff_t* offset) {
// std::lock_guard<std::mutex> locked(allocate_memory_mu_);
std::lock_guard<decltype(cold_obj_lru_.mu_)> locked(cold_obj_lru_.mu_);
// std::lock_guard<decltype(cold_obj_lru_.mu_)> locked(cold_obj_lru_.mu_);
uint8_t* pointer = nullptr;
std::cout << "Thread " << std::this_thread::get_id()
<< " before AllocateMemoryWithSpill;" << "size:" << size
Expand All @@ -576,8 +591,7 @@ class ColdObjectTracker
// 2. memory usage is above upper bound
if (pointer == nullptr ||
BulkAllocator::Allocated() >= self().mem_spill_upper_bound_) {
// std::unique_lock<std::mutex> locked(spill_mu_);

std::lock_guard<std::mutex> locked(spill_mu_);
int64_t min_spill_size = 0;
if (pointer == nullptr) {
std::cout << "Thread " << std::this_thread::get_id()
Expand Down Expand Up @@ -688,8 +702,6 @@ class ColdObjectTracker
lru_t cold_obj_lru_;
std::string spill_path_;
std::mutex spill_mu_;
std::mutex allocate_memory_mu_;
std::shared_mutex global_file_mu_;
};

} // namespace detail
Expand Down
18 changes: 0 additions & 18 deletions src/server/util/spill_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ Status SpillFileWriter::Init(const ObjectID object_id) {
}

Status SpillFileWriter::Write(const std::shared_ptr<Payload>& payload) {
//FileLocker::getInstance().lockForWrite(payload->object_id);
//std::lock_guard<std::mutex> lock(io_mutex_);
RETURN_ON_ERROR(Init(payload->object_id));
if (io_adaptor_ == nullptr) {
return Status::IOError("Can't open io_adaptor");
Expand All @@ -57,12 +55,10 @@ Status SpillFileWriter::Write(const std::shared_ptr<Payload>& payload) {
std::cout << "Thread " << std::this_thread::get_id() << " writing content: " << payload->pointer << std::endl;
auto status = io_adaptor_->Write(reinterpret_cast<const char*>(payload->pointer),
payload->data_size);
//FileLocker::getInstance().unlockForWrite(payload->object_id);
return status;
}

Status SpillFileWriter::Sync() {
//std::lock_guard<std::mutex> lock(io_mutex_);
RETURN_ON_ERROR(io_adaptor_->Flush());
io_adaptor_ = nullptr;
return Status::OK();
Expand All @@ -80,8 +76,6 @@ Status SpillFileReader::Init(const ObjectID object_id) {

Status SpillFileReader::Read(const std::shared_ptr<Payload>& payload,
const std::shared_ptr<BulkStore>& bulk_store) {
//FileLocker::getInstance().lockForRead(payload->object_id);
//std::lock_guard<std::mutex> lock(io_mutex_);
RETURN_ON_ERROR(Init(payload->object_id));
// reload 1. object_id 2. data_size 3. content
if (io_adaptor_ == nullptr) {
Expand All @@ -108,31 +102,19 @@ Status SpillFileReader::Read(const std::shared_ptr<Payload>& payload,
ObjectIDToString(payload->object_id));
}
}
payload->pointer = bulk_store->AllocateMemoryWithSpill(
payload->data_size, &(payload->store_fd), &(payload->map_size),
&(payload->data_offset));
if (payload->pointer == nullptr) {
return Status::NotEnoughMemory("Failed to allocate memory of size " +
std::to_string(payload->data_size) +
" while reload spilling file");
}
std::cout << "Thread " << std::this_thread::get_id() << " reading content: " << payload->pointer << std::endl;
RETURN_ON_ERROR(io_adaptor_->Read(payload->pointer, payload->data_size));
std::cout << "Thread " << std::this_thread::get_id() << " is deleting object_id: " << payload->object_id << std::endl;
RETURN_ON_ERROR(Delete_(payload->object_id));
io_adaptor_ = nullptr;
//FileLocker::getInstance().unlockForRead(payload->object_id);
return Status::OK();
}

Status SpillFileReader::Delete_(const ObjectID id) {
//FileLocker::getInstance().lockForRead(id);
//std::lock_guard<std::mutex> lock(io_mutex_);
if (!io_adaptor_) {
return Status::Invalid("I/O adaptor is not initialized");
}
RETURN_ON_ERROR(io_adaptor_->RemoveFile(spill_path_ + std::to_string(id)));
//FileLocker::getInstance().unlockForRead(id);
return Status::OK();
}

Expand Down
1 change: 0 additions & 1 deletion src/server/util/spill_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ class SpillFileReader {
Status Delete_(const ObjectID id);

std::string spill_path_;
std::mutex io_mutex_;
std::unique_ptr<FileIOAdaptor> io_adaptor_ = nullptr;
};

Expand Down

0 comments on commit 63db33c

Please sign in to comment.