Skip to content

Commit

Permalink
Prevent spilling object from migrating or ipc accessing.
Browse files Browse the repository at this point in the history
Fix bug of blob buffer_ids_.

Signed-off-by: vegetableysm <[email protected]>
  • Loading branch information
vegetableysm committed Aug 19, 2024
1 parent 63db33c commit 2c51172
Show file tree
Hide file tree
Showing 6 changed files with 377 additions and 308 deletions.
1 change: 1 addition & 0 deletions src/client/ds/blob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ Status BufferSet::EmplaceBuffer(ObjectID const id,
void BufferSet::Extend(BufferSet const& others) {
for (auto const& kv : others.buffers_) {
buffers_.emplace(kv.first, kv.second);
buffer_ids_.emplace(kv.first);
}
}

Expand Down
44 changes: 25 additions & 19 deletions src/server/async/socket_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,13 @@ bool SocketConnection::doCreateBuffers(const json& root) {
for (auto const& size : sizes) {
ObjectID object_id;
std::shared_ptr<Payload> object;
RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object));
Status status = bulk_store_->Create(size, object_id, object);
if (!status.ok()) {
for (auto const& object : objects) {
bulk_store_->Delete(object->id());
}
RESPONSE_ON_ERROR(status);
}
object_ids.emplace_back(object_id);
objects.emplace_back(object);
}
Expand Down Expand Up @@ -552,11 +558,7 @@ bool SocketConnection::doSealBlob(json const& root) {
ObjectID id;
TRY_READ_REQUEST(ReadSealRequest, root, id);
RESPONSE_ON_ERROR(bulk_store_->Seal(id));
Status status;
bulk_store_->objects_.find_fn(
id, [self, id, &status](const std::shared_ptr<Payload>& object) {
status = self->bulk_store_->MarkAsCold(id, object);
});
RESPONSE_ON_ERROR(bulk_store_->AddDependency(id, getConnId()));
std::string message_out;
WriteSealReply(message_out);
this->doWrite(message_out);
Expand All @@ -572,12 +574,12 @@ bool SocketConnection::doGetBuffers(const json& root) {

TRY_READ_REQUEST(ReadGetBuffersRequest, root, ids, unsafe);
RESPONSE_ON_ERROR(bulk_store_->GetUnsafe(ids, unsafe, objects));
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids, objects));
RESPONSE_ON_ERROR(bulk_store_->AddDependency(
std::unordered_set<ObjectID>(ids.begin(), ids.end()), this->getConnId()));
for (size_t i = 0; i < objects.size(); ++i) {
if (objects[i]->pointer == nullptr) {
VINEYARD_CHECK_OK(
RESPONSE_ON_ERROR(
bulk_store_->ReloadColdObject(ids[i], objects[i], false));
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids[i], objects[i]));
}
}

Expand Down Expand Up @@ -689,7 +691,7 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) {
ObjectID object_id;
RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object));
RESPONSE_ON_ERROR(bulk_store_->Seal(object_id));
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(object_id, object));
RESPONSE_ON_ERROR(bulk_store_->AddDependency(object_id, this->getConnId()));

if (use_rdma) {
std::string message_out;
Expand All @@ -701,6 +703,8 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) {
ReceiveRemoteBuffers(
socket_, {object}, compress,
[self, object](const Status& status) -> Status {
self->bulk_store_->RemoveDependency(object->object_id,
self->getConnId());
std::string message_out;
if (status.ok()) {
WriteCreateBufferReply(object->object_id, object, -1,
Expand Down Expand Up @@ -742,10 +746,10 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) {
std::shared_ptr<Payload> object;
RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object));
RESPONSE_ON_ERROR(bulk_store_->Seal(object_id));
RESPONSE_ON_ERROR(bulk_store_->AddDependency(object_id, this->getConnId()));
object_ids.emplace_back(object_id);
objects.emplace_back(object);
}
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(object_ids, objects));

if (use_rdma) {
std::string message_out;
Expand All @@ -758,6 +762,10 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) {
ReceiveRemoteBuffers(
socket_, objects, compress,
[self, object_ids, objects](const Status& status) -> Status {
self->bulk_store_->RemoveDependency(
std::unordered_set<ObjectID>(object_ids.begin(),
object_ids.end()),
self->getConnId());
std::string message_out;
if (status.ok()) {
WriteCreateBuffersReply(object_ids, objects, std::vector<int>{},
Expand Down Expand Up @@ -800,14 +808,8 @@ bool SocketConnection::doGetRemoteBuffers(const json& root) {
use_rdma);
server_ptr_->LockTransmissionObjects(ids);
RESPONSE_ON_ERROR(bulk_store_->GetUnsafe(ids, unsafe, objects));
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids, objects));
for (size_t i = 0; i < objects.size(); ++i) {
if (objects[i]->pointer == nullptr) {
VINEYARD_CHECK_OK(
bulk_store_->ReloadColdObject(ids[i], objects[i], false));
VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids[i], objects[i]));
}
}
RESPONSE_ON_ERROR(bulk_store_->AddDependency(
std::unordered_set<ObjectID>(ids.begin(), ids.end()), this->getConnId()));
WriteGetBuffersReply(objects, {}, compress, message_out);

if (!use_rdma) {
Expand All @@ -823,6 +825,8 @@ bool SocketConnection::doGetRemoteBuffers(const json& root) {
self->server_ptr_->UnlockTransmissionObjects(ids);
return Status::OK();
});
std::unordered_set<ObjectID> ids_set(ids.begin(), ids.end());
self->bulk_store_->RemoveDependency(ids_set, self->getConnId());
return Status::OK();
});
} else {
Expand Down Expand Up @@ -1866,6 +1870,8 @@ bool SocketConnection::doReleaseBlobsWithRDMA(const json& root) {

boost::asio::post(server_ptr_->GetIOContext(), [self, ids]() {
self->server_ptr_->UnlockTransmissionObjects(ids);
std::unordered_set<ObjectID> id_set(ids.begin(), ids.end());
self->bulk_store_->RemoveDependency(id_set, self->getConnId());
std::string message_out;
WriteReleaseBlobsWithRDMAReply(message_out);
self->doWrite(message_out);
Expand Down
80 changes: 49 additions & 31 deletions src/server/memory/usage.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ class ColdObjectTracker
~LRU() = default;

void Ref(const ID id, const std::shared_ptr<P>& payload) {
std::lock_guard<decltype(mu_)> locked(mu_);
std::lock_guard<decltype(lru_field_mu_)> locked(lru_field_mu_);
std::cout << "Thread " << std::this_thread::get_id()
<< " in Ref, RefPayload:" << id << std::endl;
<< " in Ref, RefPayload:" << id << std::endl;
auto it = ref_list_iter_map_.find(id);
if (it == ref_list_iter_map_.end()) {
ref_list_.emplace_front(id, payload);
Expand All @@ -223,7 +223,7 @@ class ColdObjectTracker
}

bool CheckIsSpilled(const ID id) const {
std::lock_guard<decltype(mu_)> locked(mu_);
std::lock_guard<decltype(lru_field_mu_)> locked(lru_field_mu_);
return spilled_obj_.find(id) != spilled_obj_.end();
}

Expand All @@ -238,7 +238,7 @@ class ColdObjectTracker
*/
Status Unref(const ID id, const bool fast_delete,
const std::shared_ptr<Der>& bulk_store) {
std::lock_guard<decltype(mu_)> locked(mu_);
std::lock_guard<decltype(lru_field_mu_)> locked(lru_field_mu_);
auto it = ref_list_iter_map_.find(id);
if (it == ref_list_iter_map_.end()) {
auto spilled = spilled_obj_.find(id);
Expand All @@ -249,10 +249,15 @@ class ColdObjectTracker
// NB: explicitly copy the std::shared_ptr as the iterator is not
// stable.
auto payload = spilled->second;
payload->pointer = bulk_store->AllocateMemoryWithSpill(payload->data_size,
&payload->store_fd,
&payload->map_size,
&payload->data_offset);
payload->pointer = bulk_store->AllocateMemoryWithSpill(
payload->data_size, &payload->store_fd, &payload->map_size,
&payload->data_offset);
if (payload->pointer == nullptr) {
return Status::NotEnoughMemory(
"Failed to allocate memory of size " +
std::to_string(payload->data_size) +
" while unref spilling file");
}
RETURN_ON_ERROR(bulk_store->ReloadPayload(id, payload));
} else {
RETURN_ON_ERROR(bulk_store->DeletePayloadFile(id));
Expand All @@ -267,13 +272,14 @@ class ColdObjectTracker
}

Status SpillFor(const size_t sz, const std::shared_ptr<Der>& bulk_store) {
std::lock_guard<decltype(mu_)> locked(mu_);
std::lock_guard<decltype(lru_field_mu_)> locked(lru_field_mu_);
size_t spilled_sz = 0;
auto status = Status::OK();
auto it = ref_list_.rbegin();
std::map<ObjectID, std::shared_ptr<Payload>> pinned_objects;
std::cout << "Thread " << std::this_thread::get_id()
<< " in SpillFor, ref_list_ size:" << ref_list_.size() << std::endl;
<< " in SpillFor, ref_list_ size:" << ref_list_.size()
<< std::endl;
while (it != ref_list_.rend()) {
if (it->second->IsPinned()) {
// bypass pinned
Expand All @@ -287,7 +293,8 @@ class ColdObjectTracker
auto s = this->spill(it->first, it->second, bulk_store);
if (s.ok()) {
std::cout << "Thread " << std::this_thread::get_id()
<< ", SpillPayload :" << it->first << " success" << std::endl;
<< ", SpillPayload :" << it->first << " success"
<< std::endl;
spilled_sz += it->second->data_size;
ref_list_iter_map_.erase(it->first);
} else if (s.IsObjectSpilled()) {
Expand Down Expand Up @@ -324,7 +331,7 @@ class ColdObjectTracker
Status SpillObjects(
const std::map<ObjectID, std::shared_ptr<Payload>>& objects,
const std::shared_ptr<Der>& bulk_store) {
std::lock_guard<decltype(mu_)> locked(mu_);
std::lock_guard<decltype(lru_field_mu_)> locked(lru_field_mu_);
auto status = Status::OK();
for (auto const& item : objects) {
if (item.second->IsPinned()) {
Expand All @@ -339,7 +346,7 @@ class ColdObjectTracker
Status ReloadObjects(
const std::map<ObjectID, std::shared_ptr<Payload>>& objects,
const bool pin, const std::shared_ptr<Der>& bulk_store) {
std::lock_guard<decltype(mu_)> locked(mu_);
std::lock_guard<decltype(lru_field_mu_)> locked(lru_field_mu_);
auto status = Status::OK();
for (auto const& item : objects) {
status += this->reload(item.first, item.second, pin, bulk_store);
Expand All @@ -350,21 +357,21 @@ class ColdObjectTracker
Status ReloadObject(const ObjectID& object_id,
const std::shared_ptr<Payload>& payload, const bool pin,
const std::shared_ptr<Der>& bulk_store) {
std::lock_guard<decltype(mu_)> locked(mu_);
std::lock_guard<decltype(lru_field_mu_)> locked(lru_field_mu_);
auto status = this->reload(object_id, payload, pin, bulk_store);
return status;
}

bool CheckSpilled(const ID& id) {
std::lock_guard<decltype(mu_)> locked(mu_);
std::lock_guard<decltype(lru_field_mu_)> locked(lru_field_mu_);
return spilled_obj_.find(id) != spilled_obj_.end();
}

private:
Status spill(const ObjectID object_id,
const std::shared_ptr<Payload>& payload,
const std::shared_ptr<Der>& bulk_store) {
std::lock_guard<decltype(mu_)> locked(mu_);
std::lock_guard<decltype(lru_field_mu_)> locked(lru_field_mu_);
if (payload->is_spilled) {
return Status::ObjectSpilled(object_id);
}
Expand All @@ -377,7 +384,7 @@ class ColdObjectTracker
Status reload(const ObjectID object_id,
const std::shared_ptr<Payload>& payload, const bool pin,
const std::shared_ptr<Der>& bulk_store) {
std::lock_guard<decltype(mu_)> locked(mu_);
std::lock_guard<decltype(lru_field_mu_)> locked(lru_field_mu_);
if (pin) {
payload->Pin();
}
Expand All @@ -395,8 +402,8 @@ class ColdObjectTracker
return bulk_store->ReloadPayload(object_id, payload);
}

mutable std::recursive_mutex mu_;
// protected by mu_
mutable std::recursive_mutex lru_field_mu_;
// protected by lru_field_mu_
lru_map_t ref_list_iter_map_;
lru_list_t ref_list_;
ska::flat_hash_map<ID, std::shared_ptr<P>> spilled_obj_;
Expand Down Expand Up @@ -533,15 +540,16 @@ class ColdObjectTracker
if (spill_path_.empty() || payload->data_size == 0) {
return Status::OK(); // bypass, as spill is not enabled
}
payload->pointer = AllocateMemoryWithSpill(payload->data_size, &payload->store_fd,
&payload->map_size, &payload->data_offset);
payload->pointer =
AllocateMemoryWithSpill(payload->data_size, &payload->store_fd,
&payload->map_size, &payload->data_offset);
if (payload->pointer == nullptr) {
return Status::NotEnoughMemory("Failed to allocate memory of size " +
std::to_string(payload->data_size) +
" while reload spilling file");
}
Status status = cold_obj_lru_.ReloadObject(object_id, payload, pin,
shared_from_self());
Status status =
cold_obj_lru_.ReloadObject(object_id, payload, pin, shared_from_self());
if (!status.ok()) {
BulkAllocator::Free(payload->pointer, payload->data_size);
payload->pointer = nullptr;
Expand Down Expand Up @@ -573,12 +581,21 @@ class ColdObjectTracker
*/
uint8_t* AllocateMemoryWithSpill(const size_t size, int* fd,
int64_t* map_size, ptrdiff_t* offset) {
// std::lock_guard<decltype(cold_obj_lru_.mu_)> locked(cold_obj_lru_.mu_);
/*
* FIXME:
* Because the sequence of getting spill lock and lru mu_ lock is
* non-deterministic, we use the same lock to protect both of them
* to avoid deadlock. Maybe there exists a better solution.
*/
std::lock_guard<decltype(this->cold_obj_lru_.lru_field_mu_)> locked(
this->cold_obj_lru_.lru_field_mu_);
uint8_t* pointer = nullptr;
std::cout << "Thread " << std::this_thread::get_id()
<< " before AllocateMemoryWithSpill;" << "size:" << size
<< "BulkAllocator::GetFootprintLimit():" << BulkAllocator::GetFootprintLimit()
<< "BulkAllocator::Allocated():" << BulkAllocator::Allocated() << std::endl;
<< " before AllocateMemoryWithSpill;"
<< "size:" << size << "BulkAllocator::GetFootprintLimit():"
<< BulkAllocator::GetFootprintLimit()
<< "BulkAllocator::Allocated():" << BulkAllocator::Allocated()
<< std::endl;
pointer = self().AllocateMemory(size, fd, map_size, offset);
// no spill will be conducted
if (spill_path_.empty()) {
Expand All @@ -591,13 +608,14 @@ class ColdObjectTracker
// 2. memory usage is above upper bound
if (pointer == nullptr ||
BulkAllocator::Allocated() >= self().mem_spill_upper_bound_) {
std::lock_guard<std::mutex> locked(spill_mu_);
int64_t min_spill_size = 0;
if (pointer == nullptr) {
std::cout << "Thread " << std::this_thread::get_id()
<< "pointer is nullptr;" << "size:" << size
<< "BulkAllocator::GetFootprintLimit():" << BulkAllocator::GetFootprintLimit()
<< "BulkAllocator::Allocated():" << BulkAllocator::Allocated() << std::endl;
<< "pointer is nullptr;"
<< "size:" << size << "BulkAllocator::GetFootprintLimit():"
<< BulkAllocator::GetFootprintLimit()
<< "BulkAllocator::Allocated():" << BulkAllocator::Allocated()
<< std::endl;
min_spill_size = size - (BulkAllocator::GetFootprintLimit() -
BulkAllocator::Allocated());
}
Expand Down
25 changes: 16 additions & 9 deletions src/server/util/spill_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,20 @@ Status SpillFileWriter::Write(const std::shared_ptr<Payload>& payload) {
return Status::IOError("Can't open io_adaptor");
}
RETURN_ON_ERROR(io_adaptor_->Open("w"));
std::cout << "Thread " << std::this_thread::get_id() << " writing object_id: " << payload->object_id << std::endl;
std::cout << "Thread " << std::this_thread::get_id()
<< " writing object_id: " << payload->object_id << std::endl;
RETURN_ON_ERROR(
io_adaptor_->Write(reinterpret_cast<char*>(&(payload->object_id)),
sizeof(payload->object_id)));
std::cout << "Thread " << std::this_thread::get_id() << " writing data_size: " << payload->data_size << std::endl;
std::cout << "Thread " << std::this_thread::get_id()
<< " writing data_size: " << payload->data_size << std::endl;
RETURN_ON_ERROR(
io_adaptor_->Write(reinterpret_cast<char*>(&(payload->data_size)),
sizeof(payload->data_size)));
std::cout << "Thread " << std::this_thread::get_id() << " writing content: " << payload->pointer << std::endl;
auto status = io_adaptor_->Write(reinterpret_cast<const char*>(payload->pointer),
payload->data_size);
std::cout << "Thread " << std::this_thread::get_id()
<< " writing content: " << payload->pointer << std::endl;
auto status = io_adaptor_->Write(
reinterpret_cast<const char*>(payload->pointer), payload->data_size);
return status;
}

Expand Down Expand Up @@ -84,7 +87,8 @@ Status SpillFileReader::Read(const std::shared_ptr<Payload>& payload,
RETURN_ON_ERROR(io_adaptor_->Open());
{
ObjectID object_id = InvalidObjectID();
std::cout << "Thread " << std::this_thread::get_id() << " reading object_id: " << payload->object_id << std::endl;
std::cout << "Thread " << std::this_thread::get_id()
<< " reading object_id: " << payload->object_id << std::endl;
RETURN_ON_ERROR(io_adaptor_->Read(&object_id, sizeof(object_id)));
if (payload->object_id != object_id) {
return Status::IOError(
Expand All @@ -94,17 +98,20 @@ Status SpillFileReader::Read(const std::shared_ptr<Payload>& payload,
}
{
int64_t data_size = std::numeric_limits<int64_t>::min();
std::cout << "Thread " << std::this_thread::get_id() << " reading data_size: " << data_size << std::endl;
std::cout << "Thread " << std::this_thread::get_id()
<< " reading data_size: " << data_size << std::endl;
RETURN_ON_ERROR(io_adaptor_->Read(&data_size, sizeof(data_size)));
if (payload->data_size != data_size) {
return Status::IOError(
"Incorrect 'data_size': opening wrong file: " + spill_path_ +
ObjectIDToString(payload->object_id));
}
}
std::cout << "Thread " << std::this_thread::get_id() << " reading content: " << payload->pointer << std::endl;
std::cout << "Thread " << std::this_thread::get_id()
<< " reading content: " << payload->pointer << std::endl;
RETURN_ON_ERROR(io_adaptor_->Read(payload->pointer, payload->data_size));
std::cout << "Thread " << std::this_thread::get_id() << " is deleting object_id: " << payload->object_id << std::endl;
std::cout << "Thread " << std::this_thread::get_id()
<< " is deleting object_id: " << payload->object_id << std::endl;
RETURN_ON_ERROR(Delete_(payload->object_id));
io_adaptor_ = nullptr;
return Status::OK();
Expand Down
Loading

0 comments on commit 2c51172

Please sign in to comment.