From 2c51172f7a6841a503397e49986f146d6426f3cd Mon Sep 17 00:00:00 2001 From: vegetableysm Date: Mon, 19 Aug 2024 16:22:22 +0800 Subject: [PATCH] Prevent spilling object from migrating or ipc accessing. Fix bug of blob buffer_ids_. Signed-off-by: vegetableysm --- src/client/ds/blob.cc | 1 + src/server/async/socket_server.cc | 44 +-- src/server/memory/usage.h | 80 +++-- src/server/util/spill_file.cc | 25 +- src/server/util/spill_file.h | 54 ++-- test/concurrent_lru_spill_test.cc | 481 ++++++++++++++++-------------- 6 files changed, 377 insertions(+), 308 deletions(-) diff --git a/src/client/ds/blob.cc b/src/client/ds/blob.cc index 057d76f7a7..86d6eb02b4 100644 --- a/src/client/ds/blob.cc +++ b/src/client/ds/blob.cc @@ -380,6 +380,7 @@ Status BufferSet::EmplaceBuffer(ObjectID const id, void BufferSet::Extend(BufferSet const& others) { for (auto const& kv : others.buffers_) { buffers_.emplace(kv.first, kv.second); + buffer_ids_.emplace(kv.first); } } diff --git a/src/server/async/socket_server.cc b/src/server/async/socket_server.cc index 9e17b65215..4d4fdd85a7 100644 --- a/src/server/async/socket_server.cc +++ b/src/server/async/socket_server.cc @@ -447,7 +447,13 @@ bool SocketConnection::doCreateBuffers(const json& root) { for (auto const& size : sizes) { ObjectID object_id; std::shared_ptr object; - RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object)); + Status status = bulk_store_->Create(size, object_id, object); + if (!status.ok()) { + for (auto const& object : objects) { + bulk_store_->Delete(object->id()); + } + RESPONSE_ON_ERROR(status); + } object_ids.emplace_back(object_id); objects.emplace_back(object); } @@ -552,11 +558,7 @@ bool SocketConnection::doSealBlob(json const& root) { ObjectID id; TRY_READ_REQUEST(ReadSealRequest, root, id); RESPONSE_ON_ERROR(bulk_store_->Seal(id)); - Status status; - bulk_store_->objects_.find_fn( - id, [self, id, &status](const std::shared_ptr& object) { - status = self->bulk_store_->MarkAsCold(id, object); - }); + RESPONSE_ON_ERROR(bulk_store_->AddDependency(id, getConnId())); std::string message_out; WriteSealReply(message_out); this->doWrite(message_out); @@ -572,12 +574,12 @@ bool SocketConnection::doGetBuffers(const json& root) { TRY_READ_REQUEST(ReadGetBuffersRequest, root, ids, unsafe); RESPONSE_ON_ERROR(bulk_store_->GetUnsafe(ids, unsafe, objects)); - VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids, objects)); + RESPONSE_ON_ERROR(bulk_store_->AddDependency( + std::unordered_set(ids.begin(), ids.end()), this->getConnId())); for (size_t i = 0; i < objects.size(); ++i) { if (objects[i]->pointer == nullptr) { - VINEYARD_CHECK_OK( + RESPONSE_ON_ERROR( bulk_store_->ReloadColdObject(ids[i], objects[i], false)); - VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids[i], objects[i])); } } @@ -689,7 +691,7 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) { ObjectID object_id; RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object)); RESPONSE_ON_ERROR(bulk_store_->Seal(object_id)); - VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(object_id, object)); + RESPONSE_ON_ERROR(bulk_store_->AddDependency(object_id, this->getConnId())); if (use_rdma) { std::string message_out; @@ -701,6 +703,8 @@ bool SocketConnection::doCreateRemoteBuffer(const json& root) { ReceiveRemoteBuffers( socket_, {object}, compress, [self, object](const Status& status) -> Status { + self->bulk_store_->RemoveDependency(object->object_id, + self->getConnId()); std::string message_out; if (status.ok()) { WriteCreateBufferReply(object->object_id, object, -1, @@ -742,10 +746,10 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) { std::shared_ptr object; RESPONSE_ON_ERROR(bulk_store_->Create(size, object_id, object)); RESPONSE_ON_ERROR(bulk_store_->Seal(object_id)); + RESPONSE_ON_ERROR(bulk_store_->AddDependency(object_id, this->getConnId())); object_ids.emplace_back(object_id); objects.emplace_back(object); } - VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(object_ids, objects)); if (use_rdma) { std::string message_out; @@ -758,6 +762,10 @@ bool SocketConnection::doCreateRemoteBuffers(const json& root) { ReceiveRemoteBuffers( socket_, objects, compress, [self, object_ids, objects](const Status& status) -> Status { + self->bulk_store_->RemoveDependency( + std::unordered_set(object_ids.begin(), + object_ids.end()), + self->getConnId()); std::string message_out; if (status.ok()) { WriteCreateBuffersReply(object_ids, objects, std::vector{}, @@ -800,14 +808,8 @@ bool SocketConnection::doGetRemoteBuffers(const json& root) { use_rdma); server_ptr_->LockTransmissionObjects(ids); RESPONSE_ON_ERROR(bulk_store_->GetUnsafe(ids, unsafe, objects)); - VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids, objects)); - for (size_t i = 0; i < objects.size(); ++i) { - if (objects[i]->pointer == nullptr) { - VINEYARD_CHECK_OK( - bulk_store_->ReloadColdObject(ids[i], objects[i], false)); - VINEYARD_CHECK_OK(bulk_store_->MarkAsCold(ids[i], objects[i])); - } - } + RESPONSE_ON_ERROR(bulk_store_->AddDependency( + std::unordered_set(ids.begin(), ids.end()), this->getConnId())); WriteGetBuffersReply(objects, {}, compress, message_out); if (!use_rdma) { @@ -823,6 +825,8 @@ bool SocketConnection::doGetRemoteBuffers(const json& root) { self->server_ptr_->UnlockTransmissionObjects(ids); return Status::OK(); }); + std::unordered_set ids_set(ids.begin(), ids.end()); + self->bulk_store_->RemoveDependency(ids_set, self->getConnId()); return Status::OK(); }); } else { @@ -1866,6 +1870,8 @@ bool SocketConnection::doReleaseBlobsWithRDMA(const json& root) { boost::asio::post(server_ptr_->GetIOContext(), [self, ids]() { self->server_ptr_->UnlockTransmissionObjects(ids); + std::unordered_set id_set(ids.begin(), ids.end()); + self->bulk_store_->RemoveDependency(id_set, self->getConnId()); std::string message_out; WriteReleaseBlobsWithRDMAReply(message_out); self->doWrite(message_out); diff --git a/src/server/memory/usage.h b/src/server/memory/usage.h index d5a45398d0..bd0c9cef6a 100644 --- a/src/server/memory/usage.h +++ b/src/server/memory/usage.h @@ -208,9 +208,9 @@ class ColdObjectTracker ~LRU() = default; void Ref(const ID id, const std::shared_ptr

& payload) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); std::cout << "Thread " << std::this_thread::get_id() - << " in Ref, RefPayload:" << id << std::endl; + << " in Ref, RefPayload:" << id << std::endl; auto it = ref_list_iter_map_.find(id); if (it == ref_list_iter_map_.end()) { ref_list_.emplace_front(id, payload); @@ -223,7 +223,7 @@ class ColdObjectTracker } bool CheckIsSpilled(const ID id) const { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); return spilled_obj_.find(id) != spilled_obj_.end(); } @@ -238,7 +238,7 @@ class ColdObjectTracker */ Status Unref(const ID id, const bool fast_delete, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); auto it = ref_list_iter_map_.find(id); if (it == ref_list_iter_map_.end()) { auto spilled = spilled_obj_.find(id); @@ -249,10 +249,15 @@ class ColdObjectTracker // NB: explicitly copy the std::shared_ptr as the iterator is not // stable. auto payload = spilled->second; - payload->pointer = bulk_store->AllocateMemoryWithSpill(payload->data_size, - &payload->store_fd, - &payload->map_size, - &payload->data_offset); + payload->pointer = bulk_store->AllocateMemoryWithSpill( + payload->data_size, &payload->store_fd, &payload->map_size, + &payload->data_offset); + if (payload->pointer == nullptr) { + return Status::NotEnoughMemory( + "Failed to allocate memory of size " + + std::to_string(payload->data_size) + + " while unref spilling file"); + } RETURN_ON_ERROR(bulk_store->ReloadPayload(id, payload)); } else { RETURN_ON_ERROR(bulk_store->DeletePayloadFile(id)); @@ -267,13 +272,14 @@ class ColdObjectTracker } Status SpillFor(const size_t sz, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); size_t spilled_sz = 0; auto status = Status::OK(); auto it = ref_list_.rbegin(); std::map> pinned_objects; std::cout << "Thread " << std::this_thread::get_id() - << " in SpillFor, ref_list_ size:" << ref_list_.size() << std::endl; + << " in SpillFor, ref_list_ size:" << ref_list_.size() + << std::endl; while (it != ref_list_.rend()) { if (it->second->IsPinned()) { // bypass pinned @@ -287,7 +293,8 @@ class ColdObjectTracker auto s = this->spill(it->first, it->second, bulk_store); if (s.ok()) { std::cout << "Thread " << std::this_thread::get_id() - << ", SpillPayload :" << it->first << " success" << std::endl; + << ", SpillPayload :" << it->first << " success" + << std::endl; spilled_sz += it->second->data_size; ref_list_iter_map_.erase(it->first); } else if (s.IsObjectSpilled()) { @@ -324,7 +331,7 @@ class ColdObjectTracker Status SpillObjects( const std::map>& objects, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); auto status = Status::OK(); for (auto const& item : objects) { if (item.second->IsPinned()) { @@ -339,7 +346,7 @@ class ColdObjectTracker Status ReloadObjects( const std::map>& objects, const bool pin, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); auto status = Status::OK(); for (auto const& item : objects) { status += this->reload(item.first, item.second, pin, bulk_store); @@ -350,13 +357,13 @@ class ColdObjectTracker Status ReloadObject(const ObjectID& object_id, const std::shared_ptr& payload, const bool pin, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); auto status = this->reload(object_id, payload, pin, bulk_store); return status; } bool CheckSpilled(const ID& id) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); return spilled_obj_.find(id) != spilled_obj_.end(); } @@ -364,7 +371,7 @@ class ColdObjectTracker Status spill(const ObjectID object_id, const std::shared_ptr& payload, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); if (payload->is_spilled) { return Status::ObjectSpilled(object_id); } @@ -377,7 +384,7 @@ class ColdObjectTracker Status reload(const ObjectID object_id, const std::shared_ptr& payload, const bool pin, const std::shared_ptr& bulk_store) { - std::lock_guard locked(mu_); + std::lock_guard locked(lru_field_mu_); if (pin) { payload->Pin(); } @@ -395,8 +402,8 @@ class ColdObjectTracker return bulk_store->ReloadPayload(object_id, payload); } - mutable std::recursive_mutex mu_; - // protected by mu_ + mutable std::recursive_mutex lru_field_mu_; + // protected by lru_field_mu_ lru_map_t ref_list_iter_map_; lru_list_t ref_list_; ska::flat_hash_map> spilled_obj_; @@ -533,15 +540,16 @@ class ColdObjectTracker if (spill_path_.empty() || payload->data_size == 0) { return Status::OK(); // bypass, as spill is not enabled } - payload->pointer = AllocateMemoryWithSpill(payload->data_size, &payload->store_fd, - &payload->map_size, &payload->data_offset); + payload->pointer = + AllocateMemoryWithSpill(payload->data_size, &payload->store_fd, + &payload->map_size, &payload->data_offset); if (payload->pointer == nullptr) { return Status::NotEnoughMemory("Failed to allocate memory of size " + std::to_string(payload->data_size) + " while reload spilling file"); } - Status status = cold_obj_lru_.ReloadObject(object_id, payload, pin, - shared_from_self()); + Status status = + cold_obj_lru_.ReloadObject(object_id, payload, pin, shared_from_self()); if (!status.ok()) { BulkAllocator::Free(payload->pointer, payload->data_size); payload->pointer = nullptr; @@ -573,12 +581,21 @@ class ColdObjectTracker */ uint8_t* AllocateMemoryWithSpill(const size_t size, int* fd, int64_t* map_size, ptrdiff_t* offset) { - // std::lock_guard locked(cold_obj_lru_.mu_); + /* + * FIXME: + * Because the sequence of getting spill lock and lru mu_ lock is + * non-deterministic, we use the same lock to protect both of them + * to avoid deadlock. Maybe there exists a better solution. + */ + std::lock_guardcold_obj_lru_.lru_field_mu_)> locked( + this->cold_obj_lru_.lru_field_mu_); uint8_t* pointer = nullptr; std::cout << "Thread " << std::this_thread::get_id() - << " before AllocateMemoryWithSpill;" << "size:" << size - << "BulkAllocator::GetFootprintLimit():" << BulkAllocator::GetFootprintLimit() - << "BulkAllocator::Allocated():" << BulkAllocator::Allocated() << std::endl; + << " before AllocateMemoryWithSpill;" + << "size:" << size << "BulkAllocator::GetFootprintLimit():" + << BulkAllocator::GetFootprintLimit() + << "BulkAllocator::Allocated():" << BulkAllocator::Allocated() + << std::endl; pointer = self().AllocateMemory(size, fd, map_size, offset); // no spill will be conducted if (spill_path_.empty()) { @@ -591,13 +608,14 @@ class ColdObjectTracker // 2. memory usage is above upper bound if (pointer == nullptr || BulkAllocator::Allocated() >= self().mem_spill_upper_bound_) { - std::lock_guard locked(spill_mu_); int64_t min_spill_size = 0; if (pointer == nullptr) { std::cout << "Thread " << std::this_thread::get_id() - << "pointer is nullptr;" << "size:" << size - << "BulkAllocator::GetFootprintLimit():" << BulkAllocator::GetFootprintLimit() - << "BulkAllocator::Allocated():" << BulkAllocator::Allocated() << std::endl; + << "pointer is nullptr;" + << "size:" << size << "BulkAllocator::GetFootprintLimit():" + << BulkAllocator::GetFootprintLimit() + << "BulkAllocator::Allocated():" << BulkAllocator::Allocated() + << std::endl; min_spill_size = size - (BulkAllocator::GetFootprintLimit() - BulkAllocator::Allocated()); } diff --git a/src/server/util/spill_file.cc b/src/server/util/spill_file.cc index b29e5a0c9a..fffbc609f1 100644 --- a/src/server/util/spill_file.cc +++ b/src/server/util/spill_file.cc @@ -44,17 +44,20 @@ Status SpillFileWriter::Write(const std::shared_ptr& payload) { return Status::IOError("Can't open io_adaptor"); } RETURN_ON_ERROR(io_adaptor_->Open("w")); - std::cout << "Thread " << std::this_thread::get_id() << " writing object_id: " << payload->object_id << std::endl; + std::cout << "Thread " << std::this_thread::get_id() + << " writing object_id: " << payload->object_id << std::endl; RETURN_ON_ERROR( io_adaptor_->Write(reinterpret_cast(&(payload->object_id)), sizeof(payload->object_id))); - std::cout << "Thread " << std::this_thread::get_id() << " writing data_size: " << payload->data_size << std::endl; + std::cout << "Thread " << std::this_thread::get_id() + << " writing data_size: " << payload->data_size << std::endl; RETURN_ON_ERROR( io_adaptor_->Write(reinterpret_cast(&(payload->data_size)), sizeof(payload->data_size))); - std::cout << "Thread " << std::this_thread::get_id() << " writing content: " << payload->pointer << std::endl; - auto status = io_adaptor_->Write(reinterpret_cast(payload->pointer), - payload->data_size); + std::cout << "Thread " << std::this_thread::get_id() + << " writing content: " << payload->pointer << std::endl; + auto status = io_adaptor_->Write( + reinterpret_cast(payload->pointer), payload->data_size); return status; } @@ -84,7 +87,8 @@ Status SpillFileReader::Read(const std::shared_ptr& payload, RETURN_ON_ERROR(io_adaptor_->Open()); { ObjectID object_id = InvalidObjectID(); - std::cout << "Thread " << std::this_thread::get_id() << " reading object_id: " << payload->object_id << std::endl; + std::cout << "Thread " << std::this_thread::get_id() + << " reading object_id: " << payload->object_id << std::endl; RETURN_ON_ERROR(io_adaptor_->Read(&object_id, sizeof(object_id))); if (payload->object_id != object_id) { return Status::IOError( @@ -94,7 +98,8 @@ Status SpillFileReader::Read(const std::shared_ptr& payload, } { int64_t data_size = std::numeric_limits::min(); - std::cout << "Thread " << std::this_thread::get_id() << " reading data_size: " << data_size << std::endl; + std::cout << "Thread " << std::this_thread::get_id() + << " reading data_size: " << data_size << std::endl; RETURN_ON_ERROR(io_adaptor_->Read(&data_size, sizeof(data_size))); if (payload->data_size != data_size) { return Status::IOError( @@ -102,9 +107,11 @@ Status SpillFileReader::Read(const std::shared_ptr& payload, ObjectIDToString(payload->object_id)); } } - std::cout << "Thread " << std::this_thread::get_id() << " reading content: " << payload->pointer << std::endl; + std::cout << "Thread " << std::this_thread::get_id() + << " reading content: " << payload->pointer << std::endl; RETURN_ON_ERROR(io_adaptor_->Read(payload->pointer, payload->data_size)); - std::cout << "Thread " << std::this_thread::get_id() << " is deleting object_id: " << payload->object_id << std::endl; + std::cout << "Thread " << std::this_thread::get_id() + << " is deleting object_id: " << payload->object_id << std::endl; RETURN_ON_ERROR(Delete_(payload->object_id)); io_adaptor_ = nullptr; return Status::OK(); diff --git a/src/server/util/spill_file.h b/src/server/util/spill_file.h index b011051a49..3db56aeaf8 100644 --- a/src/server/util/spill_file.h +++ b/src/server/util/spill_file.h @@ -17,10 +17,10 @@ limitations under the License. #define SRC_SERVER_UTIL_SPILL_FILE_H_ #include -#include #include -#include +#include #include +#include #include "common/memory/payload.h" #include "common/util/arrow.h" @@ -32,36 +32,36 @@ namespace vineyard { namespace io { class FileLocker { -public: - static FileLocker& getInstance() { - static FileLocker instance; - return instance; - } + public: + static FileLocker& getInstance() { + static FileLocker instance; + return instance; + } - void lockForWrite(const ObjectID& id) { - std::unique_lock mapLock(mapMutex_); - fileLocks_[id].lock(); - } + void lockForWrite(const ObjectID& id) { + std::unique_lock mapLock(mapMutex_); + fileLocks_[id].lock(); + } - void unlockForWrite(const ObjectID& id) { - std::unique_lock mapLock(mapMutex_); - fileLocks_[id].unlock(); - } + void unlockForWrite(const ObjectID& id) { + std::unique_lock mapLock(mapMutex_); + fileLocks_[id].unlock(); + } - void lockForRead(const ObjectID& id) { - std::unique_lock mapLock(mapMutex_); - fileLocks_[id].lock_shared(); - } + void lockForRead(const ObjectID& id) { + std::unique_lock mapLock(mapMutex_); + fileLocks_[id].lock_shared(); + } - void unlockForRead(const ObjectID& id) { - std::unique_lock mapLock(mapMutex_); - fileLocks_[id].unlock_shared(); - } + void unlockForRead(const ObjectID& id) { + std::unique_lock mapLock(mapMutex_); + fileLocks_[id].unlock_shared(); + } -private: - FileLocker() = default; - std::shared_mutex mapMutex_; - std::unordered_map fileLocks_; + private: + FileLocker() = default; + std::shared_mutex mapMutex_; + std::unordered_map fileLocks_; }; /* diff --git a/test/concurrent_lru_spill_test.cc b/test/concurrent_lru_spill_test.cc index 2c3a349421..2b8da33dd8 100644 --- a/test/concurrent_lru_spill_test.cc +++ b/test/concurrent_lru_spill_test.cc @@ -47,249 +47,286 @@ vector InitArray(int size, std::function init_func) { } void ConcurrentPutWithClient(std::string ipc_socket) { - const int array_size = 250; - const int num_objects = 500; - const int num_threads = 10; - auto create_and_seal_array = [&](Client& c) { - auto double_array = InitArray(array_size, [](int i) { return i; }); - ArrayBuilder builder(c, double_array); - auto sealed_array = std::dynamic_pointer_cast>(builder.Seal(c)); - }; + const int array_size = 250; + const int num_objects = 500; + const int num_threads = 10; + auto create_and_seal_array = [&](Client& c) { + auto double_array = InitArray(array_size, [](int i) { return i; }); + ArrayBuilder builder(c, double_array); + auto sealed_array = + std::dynamic_pointer_cast>(builder.Seal(c)); + }; + + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + auto worker = [&]() { Client client; VINEYARD_CHECK_OK(client.Connect(ipc_socket)); - - auto worker = [&]() { - Client client; - VINEYARD_CHECK_OK(client.Connect(ipc_socket)); - for (int i = 0; i < num_objects; ++i) { - create_and_seal_array(client); - } - }; - std::vector threads; - for (int i = 0; i < num_threads; ++i) { - threads.emplace_back(worker); + for (int i = 0; i < num_objects; ++i) { + create_and_seal_array(client); } + }; + std::vector threads; + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker); + } - for (auto& thread : threads) { - thread.join(); - } + for (auto& thread : threads) { + thread.join(); + } - VINEYARD_CHECK_OK(client.Clear()); + VINEYARD_CHECK_OK(client.Clear()); } void ConcurrentGetWithClient(std::string ipc_socket) { - const int array_size = 250; - const int num_objects = 500; - const int num_threads = 10; - auto create_and_seal_array = [&](Client& c) { - auto double_array = InitArray(array_size, [](int i) { return i; }); - ArrayBuilder builder(c, double_array); - auto sealed_array = std::dynamic_pointer_cast>(builder.Seal(c)); - return GetObjectID(sealed_array); - }; + const int array_size = 250; + const int num_objects = 500; + const int num_threads = 10; + auto create_and_seal_array = [&](Client& c) { + auto double_array = InitArray(array_size, [](int i) { return i; }); + ArrayBuilder builder(c, double_array); + auto sealed_array = + std::dynamic_pointer_cast>(builder.Seal(c)); + return GetObjectID(sealed_array); + }; + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + + std::vector objects; + for (int i = 0; i < num_objects * num_threads; i++) { + objects.push_back(create_and_seal_array(client)); + } + + auto worker = [&](std::vector ids) { Client client; VINEYARD_CHECK_OK(client.Connect(ipc_socket)); - - std::vector objects; - for (int i = 0; i < num_objects * num_threads; i++){ - objects.push_back(create_and_seal_array(client)); - } - - auto worker = [&](std::vector ids) { - Client client; - VINEYARD_CHECK_OK(client.Connect(ipc_socket)); - for (int i = 0; i < num_objects * num_threads; ++i) { - std::shared_ptr object; - ObjectID id = ids[i]; - VINEYARD_CHECK_OK(client.GetObject(id, object)); - } - }; - std::vector threads; - for (int i = 0; i < num_threads; ++i) { - threads.emplace_back(worker, objects); + for (int i = 0; i < num_objects * num_threads; ++i) { + std::shared_ptr object; + ObjectID id = ids[i]; + VINEYARD_CHECK_OK(client.GetObject(id, object)); } + }; + std::vector threads; + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker, objects); + } - for (auto& thread : threads) { - thread.join(); - } - VINEYARD_CHECK_OK(client.Clear()); + for (auto& thread : threads) { + thread.join(); + } + VINEYARD_CHECK_OK(client.Clear()); } void ConcurrentPutWithRPCClient(std::string rpc_endpoint) { - const int array_size = 250; - const int num_objects = 500; - const int num_threads = 10; - - auto double_array = InitArray(array_size, [](int i) { return i; }); - auto create_remote_blob = [&](RPCClient& c) { - auto remote_blob_writer = std::make_shared(double_array.size() * sizeof(double)); - std::memcpy(remote_blob_writer->data(), double_array.data(), double_array.size() * sizeof(double)); - ObjectMeta meta; - VINEYARD_CHECK_OK(c.CreateRemoteBlob(remote_blob_writer, meta)); - }; - + const int array_size = 250; + const int num_objects = 500; + const int num_threads = 10; + + auto double_array = InitArray(array_size, [](int i) { return i; }); + auto create_remote_blob = [&](RPCClient& c) { + auto remote_blob_writer = std::make_shared( + double_array.size() * sizeof(double)); + std::memcpy(remote_blob_writer->data(), double_array.data(), + double_array.size() * sizeof(double)); + ObjectMeta meta; + VINEYARD_CHECK_OK(c.CreateRemoteBlob(remote_blob_writer, meta)); + }; + + RPCClient rpc_client; + VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); + + auto worker = [&]() { RPCClient rpc_client; VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); - - auto worker = [&]() { - RPCClient rpc_client; - VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); - for (int i = 0; i < num_objects; ++i) { - create_remote_blob(rpc_client); - } - }; - std::vector threads; - for (int i = 0; i < num_threads; ++i) { - threads.emplace_back(worker); + for (int i = 0; i < num_objects; ++i) { + create_remote_blob(rpc_client); } + }; + std::vector threads; + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker); + } - for (auto& thread : threads) { - thread.join(); - } + for (auto& thread : threads) { + thread.join(); + } - VINEYARD_CHECK_OK(rpc_client.Clear()); + VINEYARD_CHECK_OK(rpc_client.Clear()); } void ConcurrentGetWithRPCClient(std::string rpc_endpoint) { - const int array_size = 250; - const int num_objects = 500; - const int num_threads = 10; - - auto double_array = InitArray(array_size, [](int i) { return i; }); - auto create_remote_blob = [&](RPCClient& c) { - auto remote_blob_writer = std::make_shared(double_array.size() * sizeof(double)); - std::memcpy(remote_blob_writer->data(), double_array.data(), double_array.size() * sizeof(double)); - ObjectMeta meta; - VINEYARD_CHECK_OK(c.CreateRemoteBlob(remote_blob_writer, meta)); - return meta.GetId(); - }; - + const int array_size = 250; + const int num_objects = 500; + const int num_threads = 10; + + auto double_array = InitArray(array_size, [](int i) { return i; }); + auto create_remote_blob = [&](RPCClient& c) { + auto remote_blob_writer = std::make_shared( + double_array.size() * sizeof(double)); + std::memcpy(remote_blob_writer->data(), double_array.data(), + double_array.size() * sizeof(double)); + ObjectMeta meta; + VINEYARD_CHECK_OK(c.CreateRemoteBlob(remote_blob_writer, meta)); + return meta.GetId(); + }; + + RPCClient rpc_client; + VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); + + std::vector objects; + for (int i = 0; i < num_objects * num_threads; i++) { + objects.push_back(create_remote_blob(rpc_client)); + } + auto worker = [&](std::vector ids) { RPCClient rpc_client; VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); - - std::vector objects; - for (int i = 0; i < num_objects * num_threads; i++){ - objects.push_back(create_remote_blob(rpc_client)); - } - auto worker = [&](std::vector ids) { - RPCClient rpc_client; - VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); - for (int i = 0; i < num_objects * num_threads; ++i) { - ObjectID id = ids[i]; - rpc_client.GetObject(id); - } - }; - std::vector threads; - for (int i = 0; i < num_threads; ++i) { - threads.emplace_back(worker, objects); + for (int i = 0; i < num_objects * num_threads; ++i) { + ObjectID id = ids[i]; + rpc_client.GetObject(id); } + }; + std::vector threads; + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker, objects); + } - for (auto& thread : threads) { - thread.join(); - } + for (auto& thread : threads) { + thread.join(); + } - VINEYARD_CHECK_OK(rpc_client.Clear()); + VINEYARD_CHECK_OK(rpc_client.Clear()); } void ConcurrentGetAndPut(std::string ipc_socket, std::string rpc_endpoint) { - const int num_threads = 20; - const int num_objects = 500; - const int array_size = 250; - const int initial_objects = 100; - - std::vector object_ids; - std::mutex object_ids_mutex; - - auto create_and_seal_array = [&](Client& c) { - auto double_array = InitArray(array_size, [](int i) { return static_cast(i); }); - ArrayBuilder builder(c, double_array); - auto sealed_array = std::dynamic_pointer_cast>(builder.Seal(c)); - return GetObjectID(sealed_array); - }; - - auto create_remote_blob = [&](RPCClient& c) { - auto double_array = InitArray(array_size, [](int i) { return static_cast(i); }); - auto remote_blob_writer = std::make_shared(double_array.size() * sizeof(double)); - std::memcpy(remote_blob_writer->data(), double_array.data(), double_array.size() * sizeof(double)); - ObjectMeta blob_meta; - VINEYARD_CHECK_OK(c.CreateRemoteBlob(remote_blob_writer, blob_meta)); - return blob_meta.GetId(); - }; - - { - Client client; - VINEYARD_CHECK_OK(client.Connect(ipc_socket)); - for (int i = 0; i < initial_objects; i++) { - object_ids.push_back(create_and_seal_array(client)); - } + const int num_threads = 500; + const int num_objects = 500; + const int array_size = 250; + const int initial_objects = 100; + + std::vector object_ids; + std::mutex object_ids_mutex; + + auto create_and_seal_array = [&](Client& c) { + try { + auto double_array = InitArray( + array_size, [](int i) { return static_cast(i); }); + ArrayBuilder builder(c, double_array); + auto sealed_array = + std::dynamic_pointer_cast>(builder.Seal(c)); + ObjectMeta meta = sealed_array->meta(); + std::shared_ptr buffer_set = meta.GetBufferSet(); + std::set ids = buffer_set->AllBufferIds(); + for (auto id : ids) { + c.Release(id); + } + return GetObjectID(sealed_array); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + return InvalidObjectID(); } - - auto worker = [&](int id, std::vector object_ids) { - Client client; - VINEYARD_CHECK_OK(client.Connect(ipc_socket)); - RPCClient rpc_client; - VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); - - for (int i = 0; i < num_objects; ++i) { - if (id % 2 == 0) { - if (i % 3 == 0) { - ObjectID new_id = create_and_seal_array(client); - // { - // std::lock_guard lock(object_ids_mutex); - // object_ids.push_back(new_id); - //} - // } else { - // ObjectID id_to_get; - // { - // std::lock_guard lock(object_ids_mutex); - // if (!object_ids.empty()) { - // id_to_get = object_ids[rand() % object_ids.size()]; - // } - // } - // if (id_to_get != ObjectID()) { - // std::shared_ptr object; - client.GetObject(object_ids[i%object_ids.size()]); - // } - //} - } else { - //if (i % 3 == 0) { - ObjectID new_id = create_remote_blob(rpc_client); - // { - // std::lock_guard lock(object_ids_mutex); - // object_ids.push_back(new_id); - // } - //} else { - // ObjectID id_to_get; - // { - // std::lock_guard lock(object_ids_mutex); - // if (!object_ids.empty()) { - // id_to_get = object_ids[rand() % object_ids.size()]; - // } - // } - // if (id_to_get != ObjectID()) { - // std::shared_ptr object; - rpc_client.GetObject(object_ids[i%object_ids.size()]); - // } - } - } - } - }; - - std::vector threads; - for (int i = 0; i < num_threads; ++i) { - threads.emplace_back(worker, i, object_ids); + }; + + auto create_remote_blob = [&](RPCClient& c) { + try { + auto double_array = InitArray( + array_size, [](int i) { return static_cast(i); }); + auto remote_blob_writer = std::make_shared( + double_array.size() * sizeof(double)); + std::memcpy(remote_blob_writer->data(), double_array.data(), + double_array.size() * sizeof(double)); + ObjectMeta blob_meta; + VINEYARD_CHECK_OK(c.CreateRemoteBlob(remote_blob_writer, blob_meta)); + return blob_meta.GetId(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + return InvalidObjectID(); } + }; - for (auto& thread : threads) { - thread.join(); + { + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + for (int i = 0; i < initial_objects; i++) { + object_ids.push_back(create_and_seal_array(client)); } + } - { - Client client; - VINEYARD_CHECK_OK(client.Connect(ipc_socket)); - VINEYARD_CHECK_OK(client.Clear()); + auto worker = [&](int id, std::vector object_ids) { + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + RPCClient rpc_client; + VINEYARD_CHECK_OK(rpc_client.Connect(rpc_endpoint)); + std::vector ids; + + for (int i = 0; i < num_objects; ++i) { + if (id % 2 == 0) { + if (i % 3 == 0) { + ObjectID new_id = create_and_seal_array(client); + if (new_id != InvalidObjectID()) { + client.Release(new_id); + } + + // { + // std::lock_guard lock(object_ids_mutex); + // object_ids.push_back(new_id); + //} + // } else { + // ObjectID id_to_get; + // { + // std::lock_guard lock(object_ids_mutex); + // if (!object_ids.empty()) { + // id_to_get = object_ids[rand() % object_ids.size()]; + // } + // } + // if (id_to_get != ObjectID()) { + // std::shared_ptr object; + // client.GetObject(object_ids[i%object_ids.size()]); + // client.Release(object_ids[i%object_ids.size()]); + // } + //} + } else { + // if (i % 3 == 0) { + ObjectID new_id = create_remote_blob(rpc_client); + // { + // std::lock_guard lock(object_ids_mutex); + // object_ids.push_back(new_id); + // } + //} else { + // ObjectID id_to_get; + // { + // std::lock_guard lock(object_ids_mutex); + // if (!object_ids.empty()) { + // id_to_get = object_ids[rand() % object_ids.size()]; + // } + // } + // if (id_to_get != ObjectID()) { + // std::shared_ptr object; + + if (new_id != InvalidObjectID()) { + rpc_client.GetObject(object_ids[i % object_ids.size()]); + ids.push_back(new_id); + } + // } + } + } } + }; + + std::vector threads; + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back(worker, i, object_ids); + } + + for (auto& thread : threads) { + thread.join(); + } + + { + Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + VINEYARD_CHECK_OK(client.Clear()); + } } int main(int argc, char** argv) { @@ -299,23 +336,23 @@ int main(int argc, char** argv) { } std::string ipc_socket = std::string(argv[1]); std::string rpc_endpoint = std::string(argv[2]); -/* - LOG(INFO) << "Start concurrent put test with IPCClient ..."; - ConcurrentPutWithClient(ipc_socket); - LOG(INFO) << "Passed concurrent put test with IPCClient"; - - LOG(INFO) << "Start concurrent get test with IPCClient ..."; - ConcurrentGetWithClient(ipc_socket); - LOG(INFO) << "Passed concurrent get test with IPCClient"; - - LOG(INFO) << "Start concurrent put test with RPCClient ..."; - ConcurrentPutWithRPCClient(rpc_endpoint); - LOG(INFO) << "Passed concurrent put test with RPCClient"; - - LOG(INFO) << "Start concurrent get test with RPCClient ..."; - ConcurrentGetWithRPCClient(rpc_endpoint); - LOG(INFO) << "Passed concurrent get test with RPCClient"; -*/ + /* + LOG(INFO) << "Start concurrent put test with IPCClient ..."; + ConcurrentPutWithClient(ipc_socket); + LOG(INFO) << "Passed concurrent put test with IPCClient"; + + LOG(INFO) << "Start concurrent get test with IPCClient ..."; + ConcurrentGetWithClient(ipc_socket); + LOG(INFO) << "Passed concurrent get test with IPCClient"; + + LOG(INFO) << "Start concurrent put test with RPCClient ..."; + ConcurrentPutWithRPCClient(rpc_endpoint); + LOG(INFO) << "Passed concurrent put test with RPCClient"; + + LOG(INFO) << "Start concurrent get test with RPCClient ..."; + ConcurrentGetWithRPCClient(rpc_endpoint); + LOG(INFO) << "Passed concurrent get test with RPCClient"; + */ LOG(INFO) << "Start concurrent get and put test ..."; ConcurrentGetAndPut(ipc_socket, rpc_endpoint); LOG(INFO) << "Passed concurrent get and put test";