Skip to content

Commit

Permalink
Support to spill objects with LRU strategy by vineyardd itself.
Browse files Browse the repository at this point in the history
Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji committed Aug 13, 2024
1 parent e1e1b26 commit 62952e2
Show file tree
Hide file tree
Showing 5 changed files with 428 additions and 15 deletions.
28 changes: 23 additions & 5 deletions src/server/async/socket_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,11 @@ bool SocketConnection::doSealBlob(json const& root) {
ObjectID id;
TRY_READ_REQUEST(ReadSealRequest, root, id);
RESPONSE_ON_ERROR(bulk_store_->Seal(id));
RESPONSE_ON_ERROR(bulk_store_->AddDependency(id, getConnId()));
Status status;
bulk_store_->objects_.find_fn(
id, [self, id, &status](const std::shared_ptr<Payload>& object) {
status = self->bulk_store_->MarkAsCold(id, object);
});
std::string message_out;
WriteSealReply(message_out);
this->doWrite(message_out);
Expand All @@ -568,8 +572,14 @@ bool SocketConnection::doGetBuffers(const json& root) {

TRY_READ_REQUEST(ReadGetBuffersRequest, root, ids, unsafe);
RESPONSE_ON_ERROR(bulk_store_->GetUnsafe(ids, unsafe, objects));
RESPONSE_ON_ERROR(bulk_store_->AddDependency(
std::unordered_set<ObjectID>(ids.begin(), ids.end()), this->getConnId()));
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids, objects));
for (size_t i = 0; i < objects.size(); ++i) {
if (objects[i]->pointer == nullptr) {
VINEYARD_CHECK_OK(
bulk_store_->ReloadColdObject(ids[i], objects[i], false));
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids[i], objects[i]));
}
}

std::vector<int> fd_to_send;
for (auto object : objects) {
Expand Down Expand Up @@ -679,6 +689,7 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) {
ObjectID object_id;
RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object));
RESPONSE_ON_ERROR(bulk_store_->Seal(object_id));
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(object_id, object));

if (use_rdma) {
std::string message_out;
Expand Down Expand Up @@ -734,6 +745,7 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) {
object_ids.emplace_back(object_id);
objects.emplace_back(object);
}
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(object_ids, objects));

if (use_rdma) {
std::string message_out;
Expand Down Expand Up @@ -788,8 +800,14 @@ bool SocketConnection::doGetRemoteBuffers(const json& root) {
use_rdma);
server_ptr_->LockTransmissionObjects(ids);
RESPONSE_ON_ERROR(bulk_store_->GetUnsafe(ids, unsafe, objects));
RESPONSE_ON_ERROR(bulk_store_->AddDependency(
std::unordered_set<ObjectID>(ids.begin(), ids.end()), this->getConnId()));
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids, objects));
for (size_t i = 0; i < objects.size(); ++i) {
if (objects[i]->pointer == nullptr) {
VINEYARD_CHECK_OK(
bulk_store_->ReloadColdObject(ids[i], objects[i], false));
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids[i], objects[i]));
}
}
WriteGetBuffersReply(objects, {}, compress, message_out);

if (!use_rdma) {
Expand Down
55 changes: 54 additions & 1 deletion src/server/memory/usage.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ limitations under the License.
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>

#include "flat_hash_map/flat_hash_map.hpp"
#include "libcuckoo/cuckoohash_map.hh"
Expand Down Expand Up @@ -220,7 +221,7 @@ class ColdObjectTracker

bool CheckExist(const ID id) const {
std::lock_guard<decltype(mu_)> locked(mu_);
return map_.find(id) != map_.end();
return spilled_obj_.find(id) != spilled_obj_.end();
}

/**
Expand Down Expand Up @@ -330,6 +331,14 @@ class ColdObjectTracker
return status;
}

Status ReloadObject(const ObjectID& object_id,
const std::shared_ptr<Payload>& payload, const bool pin,
const std::shared_ptr<Der>& bulk_store) {
std::lock_guard<decltype(mu_)> locked(mu_);
auto status = this->reload(object_id, payload, pin, bulk_store);
return status;
}

bool CheckSpilled(const ID& id) {
std::lock_guard<decltype(mu_)> locked(mu_);
return spilled_obj_.find(id) != spilled_obj_.end();
Expand Down Expand Up @@ -426,6 +435,20 @@ class ColdObjectTracker
return Status::OK();
}

/**
* @brief Add a blob list to the cold object list
*/
Status MarkAsCold(const std::vector<ID>& ids,
const std::vector<std::shared_ptr<P>>& payloads) {
for (size_t i = 0; i < payloads.size(); i++) {
const std::shared_ptr<P>& payload = payloads[i];
if (payload->IsSealed()) {
cold_obj_lru_.Ref(ids[i], payload);
}
}
return Status::OK();
}

/**
* @brief check if a blob is in-use. Return true if it is in-use.
*/
Expand Down Expand Up @@ -479,6 +502,20 @@ class ColdObjectTracker
return cold_obj_lru_.SpillObjects(objects, shared_from_self());
}

/**
* @brief Triggered when been requested to spill specified object to disk.
* @param object reloaded blob
*/
Status ReloadColdObject(const ObjectID& object_id,
const std::shared_ptr<Payload>& payload,
const bool pin) {
if (spill_path_.empty()) {
return Status::OK(); // bypass, as spill is not enabled
}
return cold_obj_lru_.ReloadObject(object_id, payload, pin,
shared_from_self());
}

/**
* @brief Triggered when been requested to spill specified objects to disk.
* @param objects reloaded blobs
Expand Down Expand Up @@ -577,10 +614,26 @@ class ColdObjectTracker
if (!payload->is_spilled) {
return Status::ObjectNotSpilled(payload->object_id);
}
std::unique_lock<std::mutex> locked(spill_mu_);

int64_t min_spill_size = 0;
if (payload->data_size >
(BulkAllocator::GetFootprintLimit() - BulkAllocator::Allocated())) {
min_spill_size =
payload->data_size -
(BulkAllocator::GetFootprintLimit() - BulkAllocator::Allocated());
}
if (BulkAllocator::Allocated() > self().mem_spill_lower_bound_) {
min_spill_size =
std::max(min_spill_size,
BulkAllocator::Allocated() - self().mem_spill_lower_bound_);
}
RETURN_ON_ERROR(SpillColdObjectFor(min_spill_size));
{
io::SpillFileReader reader(spill_path_);
RETURN_ON_ERROR(reader.Read(payload, shared_from_self()));
}
payload->is_spilled = false;
return this->DeletePayloadFile(id);
}

Expand Down
Loading

0 comments on commit 62952e2

Please sign in to comment.