diff --git a/modules/llm-cache/ds/vineyard_file.cc b/modules/llm-cache/ds/vineyard_file.cc index 47de9f9b7..ff84ab76a 100644 --- a/modules/llm-cache/ds/vineyard_file.cc +++ b/modules/llm-cache/ds/vineyard_file.cc @@ -41,9 +41,9 @@ void VineyardFile::Construct(const ObjectMeta& meta) { this->path_ = meta_.GetKeyValue("path"); this->access_time_ = meta_.GetKeyValue("access_time"); ObjectMeta blob_meta; - meta_.GetMemberMeta("buffer", blob_meta); + VINEYARD_CHECK_OK(meta_.GetMemberMeta("buffer", blob_meta)); ObjectID blob_id = blob_meta.GetId(); - meta.GetBuffer(blob_id, buffer_); + VINEYARD_CHECK_OK(meta.GetBuffer(blob_id, buffer_)); } Status VineyardFile::Read(void* buffer, size_t size, size_t offset) { @@ -69,7 +69,7 @@ Status VineyardFile::Make(std::shared_ptr& file, if (!ipc_client.GetName(origin_path, file_id, false).ok()) { return Status::IOError("File " + path + " is not exist."); } - ipc_client.GetMetaData(file_id, meta, true); + RETURN_ON_ERROR(ipc_client.GetMetaData(file_id, meta, true)); if (meta.GetInstanceId() == ipc_client.instance_id()) { object = ipc_client.GetObject(file_id); file = std::dynamic_pointer_cast(object); @@ -88,7 +88,7 @@ Status VineyardFile::Make(std::shared_ptr& file, } std::map cluster_info; - rpc_client.ClusterInfo(cluster_info); + RETURN_ON_ERROR(rpc_client.ClusterInfo(cluster_info)); if (object_meta.GetInstanceId() == rpc_client.remote_instance_id()) { object = rpc_client.GetObject(file_id); } else { @@ -104,7 +104,7 @@ Status VineyardFile::Make(std::shared_ptr& file, object = remote_rpc_client.GetObject(file_id); ObjectID buffer_id = object_meta.GetMember("buffer")->id(); std::shared_ptr blob; - remote_rpc_client.GetRemoteBlob(buffer_id, blob); + RETURN_ON_ERROR(remote_rpc_client.GetRemoteBlob(buffer_id, blob)); std::dynamic_pointer_cast(object)->buffer_ = blob->Buffer(); } @@ -120,7 +120,7 @@ Status VineyardFile::BatchedGetObjects( std::map>& instance_to_metas, std::unordered_map>& id_to_files) { std::map cluster_info; - rpc_client.ClusterInfo(cluster_info); + RETURN_ON_ERROR(rpc_client.ClusterInfo(cluster_info)); auto fn = [&](std::pair>& instance_to_meta) -> Status { std::vector> file_objects; @@ -130,7 +130,7 @@ Status VineyardFile::BatchedGetObjects( ids[i] = instance_to_meta.second[i].GetId(); } instance_to_meta.second.clear(); - client.GetMetaData(ids, instance_to_meta.second, false); + RETURN_ON_ERROR(client.GetMetaData(ids, instance_to_meta.second, false)); file_objects = client.GetObjects(instance_to_meta.second); } else { if (rpc_client.remote_instance_id() == instance_to_meta.first) { @@ -139,7 +139,8 @@ Status VineyardFile::BatchedGetObjects( ids[i] = instance_to_meta.second[i].GetId(); } instance_to_meta.second.clear(); - rpc_client.GetMetaData(ids, instance_to_meta.second, false); + RETURN_ON_ERROR( + rpc_client.GetMetaData(ids, instance_to_meta.second, false)); RETURN_ON_ERROR(rpc_client.BatchedGetObjects(instance_to_meta.second, file_objects)); } else { @@ -162,7 +163,8 @@ Status VineyardFile::BatchedGetObjects( * caller rpc_client, so we need to get meta again. */ instance_to_meta.second.clear(); - remote_rpc_client.GetMetaData(ids, instance_to_meta.second, false); + RETURN_ON_ERROR( + remote_rpc_client.GetMetaData(ids, instance_to_meta.second, false)); RETURN_ON_ERROR(remote_rpc_client.BatchedGetObjects( instance_to_meta.second, file_objects)); } @@ -204,7 +206,7 @@ Status VineyardFile::BatchedMake( std::vector file_metas; std::map clusterInfo; - rpc_client.ClusterInfo(clusterInfo); + RETURN_ON_ERROR(rpc_client.ClusterInfo(clusterInfo)); std::map> instance_to_metas; if (ipc_client.Connected()) { for (auto const& path : origin_paths) { @@ -214,7 +216,7 @@ Status VineyardFile::BatchedMake( } else { break; } - ipc_client.GetMetaData(file_ids, file_metas, true); + RETURN_ON_ERROR(ipc_client.GetMetaData(file_ids, file_metas, true)); } } else { // RPC @@ -226,7 +228,7 @@ Status VineyardFile::BatchedMake( break; } } - rpc_client.GetMetaData(file_ids, file_metas, true); + RETURN_ON_ERROR(rpc_client.GetMetaData(file_ids, file_metas, true)); } for (const auto& meta : file_metas) { instance_to_metas[meta.GetInstanceId()].push_back(meta); @@ -273,13 +275,13 @@ std::shared_ptr VineyardFileBuilder::SealAndPersist( ObjectMeta blob_meta; if (ipc_client.Connected()) { std::shared_ptr object; - writer_->Shrink(ipc_client, writer_->size()); - writer_->Seal(ipc_client, object); + VINEYARD_DISCARD(writer_->Shrink(ipc_client, writer_->size())); + VINEYARD_CHECK_OK(writer_->Seal(ipc_client, object)); blob_meta = object->meta(); - ipc_client.Persist(blob_meta.GetId()); + VINEYARD_CHECK_OK(ipc_client.Persist(blob_meta.GetId())); } else { - rpc_client.CreateRemoteBlob(remote_writer_, blob_meta); - rpc_client.Persist(blob_meta.GetId()); + VINEYARD_CHECK_OK(rpc_client.CreateRemoteBlob(remote_writer_, blob_meta)); + VINEYARD_CHECK_OK(rpc_client.Persist(blob_meta.GetId())); } vineyardFile->meta_.AddMember("buffer", blob_meta); vineyardFile->meta_.AddKeyValue("path", path_); @@ -299,7 +301,7 @@ std::shared_ptr VineyardFileBuilder::SealAndPersist( } else { VINEYARD_CHECK_OK( rpc_client.CreateMetaData(vineyardFile->meta_, vineyardFile->id_)); - rpc_client.Persist(vineyardFile->id_); + VINEYARD_CHECK_OK(rpc_client.Persist(vineyardFile->id_)); Status status = rpc_client.PutName(vineyardFile->id_, path_); } @@ -314,8 +316,9 @@ std::vector> VineyardFileBuilder::BatchedSealAndPersist( if (ipc_client.Connected()) { for (auto builder : builders) { std::shared_ptr object; - builder->writer_->Shrink(ipc_client, builder->writer_->size()); - builder->writer_->Seal(ipc_client, object); + VINEYARD_DISCARD( + builder->writer_->Shrink(ipc_client, builder->writer_->size())); + VINEYARD_CHECK_OK(builder->writer_->Seal(ipc_client, object)); blob_metas.push_back(object->meta()); } } else { @@ -324,16 +327,16 @@ std::vector> VineyardFileBuilder::BatchedSealAndPersist( VINEYARD_CHECK_OK(builder->Build(rpc_client, ipc_client)); remote_writers.push_back(builder->remote_writer_); } - rpc_client.CreateRemoteBlobs(remote_writers, blob_metas); + VINEYARD_CHECK_OK(rpc_client.CreateRemoteBlobs(remote_writers, blob_metas)); } for (size_t i = 0; i < blob_metas.size(); i++) { std::shared_ptr vineyard_file = std::make_shared(); if (ipc_client.Connected()) { - ipc_client.Persist(blob_metas[i].GetId()); + VINEYARD_CHECK_OK(ipc_client.Persist(blob_metas[i].GetId())); } else { - rpc_client.Persist(blob_metas[i].GetId()); + VINEYARD_CHECK_OK(rpc_client.Persist(blob_metas[i].GetId())); } vineyard_file->meta_.AddMember("buffer", blob_metas[i]); vineyard_file->meta_.AddKeyValue("path", builders[i]->path_); diff --git a/modules/llm-cache/storage/file_storage.cc b/modules/llm-cache/storage/file_storage.cc index 28168705f..0732aaa58 100644 --- a/modules/llm-cache/storage/file_storage.cc +++ b/modules/llm-cache/storage/file_storage.cc @@ -499,7 +499,7 @@ Status FileStorage::BatchedUpdate( } } - BatchedClose(read_fd_list); + VINEYARD_DISCARD(BatchedClose(read_fd_list)); std::vector> write_fd_list; std::vector left_path(pathList.begin() + lower_bound, diff --git a/src/server/async/rpc_server.cc b/src/server/async/rpc_server.cc index 9b4af00d2..0ea981905 100644 --- a/src/server/async/rpc_server.cc +++ b/src/server/async/rpc_server.cc @@ -262,7 +262,7 @@ void RPCServer::doVineyardRequestMemory(VineyardRecvContext* recv_context, << " fid_mr:" << remote_request_mem_info.mr; void* msg = nullptr; - rdma_server_->GetTXFreeMsgBuffer(msg); + VINEYARD_DISCARD(rdma_server_->GetTXFreeMsgBuffer(msg)); VineyardMsg* send_msg = reinterpret_cast(msg); send_msg->type = VINEYARD_MSG_REQUEST_MEM; send_msg->remoteMemInfo.remote_address = @@ -280,8 +280,8 @@ void RPCServer::doVineyardRequestMemory(VineyardRecvContext* recv_context, remote_mem_infos_[recv_context->rdma_conn_id].insert(std::make_pair( remote_request_mem_info.mr_desc, remote_request_mem_info)); } - rdma_server_->Send(recv_context->rdma_conn_id, msg, sizeof(VineyardMsg), - send_context); + VINEYARD_CHECK_OK(rdma_server_->Send(recv_context->rdma_conn_id, msg, + sizeof(VineyardMsg), send_context)); VLOG(100) << "Send key:" << remote_request_mem_info.rkey << " send address:" << reinterpret_cast(remote_request_mem_info.address) << " size: " << remote_request_mem_info.size; @@ -332,7 +332,9 @@ void RPCServer::doVineyardClose(VineyardRecvContext* recv_context) { if (recv_context == nullptr) { return; } - rdma_server_->CloseConnection(recv_context->rdma_conn_id); + if (!rdma_server_->CloseConnection(recv_context->rdma_conn_id).ok()) { + LOG(ERROR) << "Close connection failed!"; + } std::lock_guard scope_lock(this->rdma_mutex_); { @@ -353,14 +355,14 @@ void RPCServer::doPrepareRecv(uint64_t rdma_conn_id) { recv_context->rdma_conn_id = rdma_conn_id; void* context = reinterpret_cast(recv_context); void* msg = nullptr; - rdma_server_->GetRXFreeMsgBuffer(msg); + VINEYARD_DISCARD(rdma_server_->GetRXFreeMsgBuffer(msg)); recv_context->attr.msg_buffer = msg; rdma_server_->Recv(rdma_conn_id, msg, sizeof(VineyardMsg), context); } void RPCServer::doNothing(VineyardRecvContext* recv_context) { void* msg = nullptr; - rdma_server_->GetTXFreeMsgBuffer(msg); + VINEYARD_DISCARD(rdma_server_->GetTXFreeMsgBuffer(msg)); VineyardMsg* send_msg = reinterpret_cast(msg); send_msg->type = VINEYARD_MSG_REQUEST_MEM; @@ -372,8 +374,8 @@ void RPCServer::doNothing(VineyardRecvContext* recv_context) { VineyardSendContext* send_context = new VineyardSendContext(); memset(send_context, 0, sizeof(VineyardSendContext)); send_context->attr.msg_buffer = msg; - rdma_server_->Send(recv_context->rdma_conn_id, msg, sizeof(VineyardMsg), - send_context); + VINEYARD_DISCARD(rdma_server_->Send(recv_context->rdma_conn_id, msg, + sizeof(VineyardMsg), send_context)); } void RPCServer::doRDMARecv() { @@ -430,9 +432,9 @@ void RPCServer::doRDMARecv() { delete recv_msg_tmp; delete recv_context_tmp; }); - rdma_server_->Recv( + VINEYARD_CHECK_OK(rdma_server_->Recv( recv_context->rdma_conn_id, reinterpret_cast(recv_msg), - sizeof(VineyardMsg), reinterpret_cast(recv_context)); + sizeof(VineyardMsg), reinterpret_cast(recv_context))); } else if (recv_msg->type == VINEYARD_MSG_RELEASE_MEM) { boost::asio::post( vs_ptr_->GetIOContext(), [this, recv_context_tmp, recv_msg_tmp] { @@ -440,9 +442,9 @@ void RPCServer::doRDMARecv() { delete recv_msg_tmp; delete recv_context_tmp; }); - rdma_server_->Recv( + VINEYARD_CHECK_OK(rdma_server_->Recv( recv_context->rdma_conn_id, reinterpret_cast(recv_msg), - sizeof(VineyardMsg), reinterpret_cast(recv_context)); + sizeof(VineyardMsg), reinterpret_cast(recv_context))); } else if (recv_msg->type == VINEYARD_MSG_EMPTY) { boost::asio::post(vs_ptr_->GetIOContext(), [this, recv_context_tmp, recv_msg_tmp] { @@ -450,14 +452,14 @@ void RPCServer::doRDMARecv() { delete recv_msg_tmp; delete recv_context_tmp; }); - rdma_server_->Recv( + VINEYARD_CHECK_OK(rdma_server_->Recv( recv_context->rdma_conn_id, reinterpret_cast(recv_msg), - sizeof(VineyardMsg), reinterpret_cast(recv_context)); + sizeof(VineyardMsg), reinterpret_cast(recv_context))); } else { LOG(ERROR) << "Unknown message type: " << recv_msg->type; - rdma_server_->Recv( + VINEYARD_CHECK_OK(rdma_server_->Recv( recv_context->rdma_conn_id, reinterpret_cast(recv_msg), - sizeof(VineyardMsg), reinterpret_cast(recv_context)); + sizeof(VineyardMsg), reinterpret_cast(recv_context))); } } } diff --git a/src/server/server/vineyard_server.cc b/src/server/server/vineyard_server.cc index c5ce251dc..982f8d538 100644 --- a/src/server/server/vineyard_server.cc +++ b/src/server/server/vineyard_server.cc @@ -1473,7 +1473,7 @@ void VineyardServer::Stop() { this->bulk_store_.reset(); this->plasma_bulk_store_.reset(); - RDMAClientCreator::Clear(); + VINEYARD_DISCARD(RDMAClientCreator::Clear()); } bool VineyardServer::Running() const { return !stopped_.load(); } diff --git a/src/server/server/vineyard_server.h b/src/server/server/vineyard_server.h index 7381786a4..085310aad 100644 --- a/src/server/server/vineyard_server.h +++ b/src/server/server/vineyard_server.h @@ -316,8 +316,9 @@ class VineyardServer : public std::enable_shared_from_this { } pendding_to_delete_objects_.clear(); } - this->DelData(ids, false, false, false, false, - [](const Status& status) { return Status::OK(); }); + VINEYARD_DISCARD( + this->DelData(ids, false, false, false, false, + [](const Status& status) { return Status::OK(); })); } void AddPendingObjects(std::vector const& ids) { diff --git a/src/server/util/remote.cc b/src/server/util/remote.cc index ee69f0059..afac7db93 100644 --- a/src/server/util/remote.cc +++ b/src/server/util/remote.cc @@ -108,7 +108,7 @@ Status RemoteClient::Connect(const std::string& rpc_endpoint, Status RemoteClient::RDMARequestMemInfo(RegisterMemInfo& remote_info) { void* buffer; - this->rdma_client_->GetTXFreeMsgBuffer(buffer); + VINEYARD_DISCARD(this->rdma_client_->GetTXFreeMsgBuffer(buffer)); VineyardMsg* msg = reinterpret_cast(buffer); msg->type = VINEYARD_MSG_REQUEST_MEM; msg->remoteMemInfo.remote_address = (uint64_t) remote_info.address; @@ -116,7 +116,7 @@ Status RemoteClient::RDMARequestMemInfo(RegisterMemInfo& remote_info) { VLOG(100) << "Request remote addr: " << reinterpret_cast(msg->remoteMemInfo.remote_address); void* remoteMsg; - this->rdma_client_->GetRXFreeMsgBuffer(remoteMsg); + VINEYARD_DISCARD(this->rdma_client_->GetRXFreeMsgBuffer(remoteMsg)); memset(remoteMsg, 0, 64); VINEYARD_CHECK_OK( this->rdma_client_->Recv(remoteMsg, sizeof(VineyardMsg), nullptr)); @@ -142,7 +142,7 @@ Status RemoteClient::RDMARequestMemInfo(RegisterMemInfo& remote_info) { Status RemoteClient::RDMAReleaseMemInfo(RegisterMemInfo& remote_info) { void* buffer; - this->rdma_client_->GetTXFreeMsgBuffer(buffer); + VINEYARD_DISCARD(this->rdma_client_->GetTXFreeMsgBuffer(buffer)); VineyardMsg* msg = reinterpret_cast(buffer); msg->type = VINEYARD_MSG_RELEASE_MEM; msg->remoteMemInfo.remote_address = (uint64_t) remote_info.address; @@ -151,7 +151,8 @@ Status RemoteClient::RDMAReleaseMemInfo(RegisterMemInfo& remote_info) { << reinterpret_cast(msg->remoteMemInfo.remote_address) << ", rkey: " << msg->remoteMemInfo.key; - this->rdma_client_->Send(buffer, sizeof(VineyardMsg), nullptr); + RETURN_ON_ERROR( + this->rdma_client_->Send(buffer, sizeof(VineyardMsg), nullptr)); VINEYARD_CHECK_OK(rdma_client_->GetTXCompletion(-1, nullptr)); return Status::OK();