Skip to content

Commit

Permalink
Fix compile warnings.
Browse files Browse the repository at this point in the history
Signed-off-by: vegetableysm <[email protected]>
  • Loading branch information
vegetableysm committed Sep 2, 2024
1 parent d28a3c9 commit 04b300f
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 47 deletions.
49 changes: 26 additions & 23 deletions modules/llm-cache/ds/vineyard_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ void VineyardFile::Construct(const ObjectMeta& meta) {
this->path_ = meta_.GetKeyValue("path");
this->access_time_ = meta_.GetKeyValue<uint64_t>("access_time");
ObjectMeta blob_meta;
meta_.GetMemberMeta("buffer", blob_meta);
VINEYARD_CHECK_OK(meta_.GetMemberMeta("buffer", blob_meta));
ObjectID blob_id = blob_meta.GetId();
meta.GetBuffer(blob_id, buffer_);
VINEYARD_CHECK_OK(meta.GetBuffer(blob_id, buffer_));
}

Status VineyardFile::Read(void* buffer, size_t size, size_t offset) {
Expand All @@ -69,7 +69,7 @@ Status VineyardFile::Make(std::shared_ptr<VineyardFile>& file,
if (!ipc_client.GetName(origin_path, file_id, false).ok()) {
return Status::IOError("File " + path + " is not exist.");
}
ipc_client.GetMetaData(file_id, meta, true);
RETURN_ON_ERROR(ipc_client.GetMetaData(file_id, meta, true));
if (meta.GetInstanceId() == ipc_client.instance_id()) {
object = ipc_client.GetObject(file_id);
file = std::dynamic_pointer_cast<VineyardFile>(object);
Expand All @@ -88,7 +88,7 @@ Status VineyardFile::Make(std::shared_ptr<VineyardFile>& file,
}

std::map<InstanceID, json> cluster_info;
rpc_client.ClusterInfo(cluster_info);
RETURN_ON_ERROR(rpc_client.ClusterInfo(cluster_info));
if (object_meta.GetInstanceId() == rpc_client.remote_instance_id()) {
object = rpc_client.GetObject(file_id);
} else {
Expand All @@ -104,7 +104,7 @@ Status VineyardFile::Make(std::shared_ptr<VineyardFile>& file,
object = remote_rpc_client.GetObject(file_id);
ObjectID buffer_id = object_meta.GetMember("buffer")->id();
std::shared_ptr<RemoteBlob> blob;
remote_rpc_client.GetRemoteBlob(buffer_id, blob);
RETURN_ON_ERROR(remote_rpc_client.GetRemoteBlob(buffer_id, blob));
std::dynamic_pointer_cast<VineyardFile>(object)->buffer_ = blob->Buffer();
}

Expand All @@ -120,7 +120,7 @@ Status VineyardFile::BatchedGetObjects(
std::map<InstanceID, std::vector<ObjectMeta>>& instance_to_metas,
std::unordered_map<ObjectID, std::shared_ptr<VineyardFile>>& id_to_files) {
std::map<InstanceID, json> cluster_info;
rpc_client.ClusterInfo(cluster_info);
RETURN_ON_ERROR(rpc_client.ClusterInfo(cluster_info));
auto fn = [&](std::pair<const InstanceID, std::vector<ObjectMeta>>&
instance_to_meta) -> Status {
std::vector<std::shared_ptr<Object>> file_objects;
Expand All @@ -130,7 +130,7 @@ Status VineyardFile::BatchedGetObjects(
ids[i] = instance_to_meta.second[i].GetId();
}
instance_to_meta.second.clear();
client.GetMetaData(ids, instance_to_meta.second, false);
RETURN_ON_ERROR(client.GetMetaData(ids, instance_to_meta.second, false));
file_objects = client.GetObjects(instance_to_meta.second);
} else {
if (rpc_client.remote_instance_id() == instance_to_meta.first) {
Expand All @@ -139,7 +139,8 @@ Status VineyardFile::BatchedGetObjects(
ids[i] = instance_to_meta.second[i].GetId();
}
instance_to_meta.second.clear();
rpc_client.GetMetaData(ids, instance_to_meta.second, false);
RETURN_ON_ERROR(
rpc_client.GetMetaData(ids, instance_to_meta.second, false));
RETURN_ON_ERROR(rpc_client.BatchedGetObjects(instance_to_meta.second,
file_objects));
} else {
Expand All @@ -162,7 +163,8 @@ Status VineyardFile::BatchedGetObjects(
* caller rpc_client, so we need to get meta again.
*/
instance_to_meta.second.clear();
remote_rpc_client.GetMetaData(ids, instance_to_meta.second, false);
RETURN_ON_ERROR(
remote_rpc_client.GetMetaData(ids, instance_to_meta.second, false));
RETURN_ON_ERROR(remote_rpc_client.BatchedGetObjects(
instance_to_meta.second, file_objects));
}
Expand Down Expand Up @@ -204,7 +206,7 @@ Status VineyardFile::BatchedMake(

std::vector<ObjectMeta> file_metas;
std::map<InstanceID, vineyard::json> clusterInfo;
rpc_client.ClusterInfo(clusterInfo);
RETURN_ON_ERROR(rpc_client.ClusterInfo(clusterInfo));
std::map<InstanceID, std::vector<ObjectMeta>> instance_to_metas;
if (ipc_client.Connected()) {
for (auto const& path : origin_paths) {
Expand All @@ -214,7 +216,7 @@ Status VineyardFile::BatchedMake(
} else {
break;
}
ipc_client.GetMetaData(file_ids, file_metas, true);
RETURN_ON_ERROR(ipc_client.GetMetaData(file_ids, file_metas, true));
}
} else {
// RPC
Expand All @@ -226,7 +228,7 @@ Status VineyardFile::BatchedMake(
break;
}
}
rpc_client.GetMetaData(file_ids, file_metas, true);
RETURN_ON_ERROR(rpc_client.GetMetaData(file_ids, file_metas, true));
}
for (const auto& meta : file_metas) {
instance_to_metas[meta.GetInstanceId()].push_back(meta);
Expand Down Expand Up @@ -273,13 +275,13 @@ std::shared_ptr<Object> VineyardFileBuilder::SealAndPersist(
ObjectMeta blob_meta;
if (ipc_client.Connected()) {
std::shared_ptr<Object> object;
writer_->Shrink(ipc_client, writer_->size());
writer_->Seal(ipc_client, object);
VINEYARD_DISCARD(writer_->Shrink(ipc_client, writer_->size()));
VINEYARD_CHECK_OK(writer_->Seal(ipc_client, object));
blob_meta = object->meta();
ipc_client.Persist(blob_meta.GetId());
VINEYARD_CHECK_OK(ipc_client.Persist(blob_meta.GetId()));
} else {
rpc_client.CreateRemoteBlob(remote_writer_, blob_meta);
rpc_client.Persist(blob_meta.GetId());
VINEYARD_CHECK_OK(rpc_client.CreateRemoteBlob(remote_writer_, blob_meta));
VINEYARD_CHECK_OK(rpc_client.Persist(blob_meta.GetId()));
}
vineyardFile->meta_.AddMember("buffer", blob_meta);
vineyardFile->meta_.AddKeyValue("path", path_);
Expand All @@ -299,7 +301,7 @@ std::shared_ptr<Object> VineyardFileBuilder::SealAndPersist(
} else {
VINEYARD_CHECK_OK(
rpc_client.CreateMetaData(vineyardFile->meta_, vineyardFile->id_));
rpc_client.Persist(vineyardFile->id_);
VINEYARD_CHECK_OK(rpc_client.Persist(vineyardFile->id_));
Status status = rpc_client.PutName(vineyardFile->id_, path_);
}

Expand All @@ -314,8 +316,9 @@ std::vector<std::shared_ptr<Object>> VineyardFileBuilder::BatchedSealAndPersist(
if (ipc_client.Connected()) {
for (auto builder : builders) {
std::shared_ptr<Object> object;
builder->writer_->Shrink(ipc_client, builder->writer_->size());
builder->writer_->Seal(ipc_client, object);
VINEYARD_DISCARD(
builder->writer_->Shrink(ipc_client, builder->writer_->size()));
VINEYARD_CHECK_OK(builder->writer_->Seal(ipc_client, object));
blob_metas.push_back(object->meta());
}
} else {
Expand All @@ -324,16 +327,16 @@ std::vector<std::shared_ptr<Object>> VineyardFileBuilder::BatchedSealAndPersist(
VINEYARD_CHECK_OK(builder->Build(rpc_client, ipc_client));
remote_writers.push_back(builder->remote_writer_);
}
rpc_client.CreateRemoteBlobs(remote_writers, blob_metas);
VINEYARD_CHECK_OK(rpc_client.CreateRemoteBlobs(remote_writers, blob_metas));
}

for (size_t i = 0; i < blob_metas.size(); i++) {
std::shared_ptr<VineyardFile> vineyard_file =
std::make_shared<VineyardFile>();
if (ipc_client.Connected()) {
ipc_client.Persist(blob_metas[i].GetId());
VINEYARD_CHECK_OK(ipc_client.Persist(blob_metas[i].GetId()));
} else {
rpc_client.Persist(blob_metas[i].GetId());
VINEYARD_CHECK_OK(rpc_client.Persist(blob_metas[i].GetId()));
}
vineyard_file->meta_.AddMember("buffer", blob_metas[i]);
vineyard_file->meta_.AddKeyValue("path", builders[i]->path_);
Expand Down
2 changes: 1 addition & 1 deletion modules/llm-cache/storage/file_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ Status FileStorage::BatchedUpdate(
}
}

BatchedClose(read_fd_list);
VINEYARD_DISCARD(BatchedClose(read_fd_list));

std::vector<std::shared_ptr<FileDescriptor>> write_fd_list;
std::vector<std::string> left_path(pathList.begin() + lower_bound,
Expand Down
34 changes: 18 additions & 16 deletions src/server/async/rpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ void RPCServer::doVineyardRequestMemory(VineyardRecvContext* recv_context,
<< " fid_mr:" << remote_request_mem_info.mr;

void* msg = nullptr;
rdma_server_->GetTXFreeMsgBuffer(msg);
VINEYARD_DISCARD(rdma_server_->GetTXFreeMsgBuffer(msg));
VineyardMsg* send_msg = reinterpret_cast<VineyardMsg*>(msg);
send_msg->type = VINEYARD_MSG_REQUEST_MEM;
send_msg->remoteMemInfo.remote_address =
Expand All @@ -280,8 +280,8 @@ void RPCServer::doVineyardRequestMemory(VineyardRecvContext* recv_context,
remote_mem_infos_[recv_context->rdma_conn_id].insert(std::make_pair(
remote_request_mem_info.mr_desc, remote_request_mem_info));
}
rdma_server_->Send(recv_context->rdma_conn_id, msg, sizeof(VineyardMsg),
send_context);
VINEYARD_CHECK_OK(rdma_server_->Send(recv_context->rdma_conn_id, msg,
sizeof(VineyardMsg), send_context));
VLOG(100) << "Send key:" << remote_request_mem_info.rkey << " send address:"
<< reinterpret_cast<void*>(remote_request_mem_info.address)
<< " size: " << remote_request_mem_info.size;
Expand Down Expand Up @@ -332,7 +332,9 @@ void RPCServer::doVineyardClose(VineyardRecvContext* recv_context) {
if (recv_context == nullptr) {
return;
}
rdma_server_->CloseConnection(recv_context->rdma_conn_id);
if (!rdma_server_->CloseConnection(recv_context->rdma_conn_id).ok()) {
LOG(ERROR) << "Close connection failed!";
}

std::lock_guard<std::recursive_mutex> scope_lock(this->rdma_mutex_);
{
Expand All @@ -353,14 +355,14 @@ void RPCServer::doPrepareRecv(uint64_t rdma_conn_id) {
recv_context->rdma_conn_id = rdma_conn_id;
void* context = reinterpret_cast<void*>(recv_context);
void* msg = nullptr;
rdma_server_->GetRXFreeMsgBuffer(msg);
VINEYARD_DISCARD(rdma_server_->GetRXFreeMsgBuffer(msg));
recv_context->attr.msg_buffer = msg;
rdma_server_->Recv(rdma_conn_id, msg, sizeof(VineyardMsg), context);
}

void RPCServer::doNothing(VineyardRecvContext* recv_context) {
void* msg = nullptr;
rdma_server_->GetTXFreeMsgBuffer(msg);
VINEYARD_DISCARD(rdma_server_->GetTXFreeMsgBuffer(msg));
VineyardMsg* send_msg = reinterpret_cast<VineyardMsg*>(msg);
send_msg->type = VINEYARD_MSG_REQUEST_MEM;

Expand All @@ -372,8 +374,8 @@ void RPCServer::doNothing(VineyardRecvContext* recv_context) {
VineyardSendContext* send_context = new VineyardSendContext();
memset(send_context, 0, sizeof(VineyardSendContext));
send_context->attr.msg_buffer = msg;
rdma_server_->Send(recv_context->rdma_conn_id, msg, sizeof(VineyardMsg),
send_context);
VINEYARD_DISCARD(rdma_server_->Send(recv_context->rdma_conn_id, msg,
sizeof(VineyardMsg), send_context));
}

void RPCServer::doRDMARecv() {
Expand Down Expand Up @@ -430,34 +432,34 @@ void RPCServer::doRDMARecv() {
delete recv_msg_tmp;
delete recv_context_tmp;
});
rdma_server_->Recv(
VINEYARD_CHECK_OK(rdma_server_->Recv(
recv_context->rdma_conn_id, reinterpret_cast<void*>(recv_msg),
sizeof(VineyardMsg), reinterpret_cast<void*>(recv_context));
sizeof(VineyardMsg), reinterpret_cast<void*>(recv_context)));
} else if (recv_msg->type == VINEYARD_MSG_RELEASE_MEM) {
boost::asio::post(
vs_ptr_->GetIOContext(), [this, recv_context_tmp, recv_msg_tmp] {
doVineyardReleaseMemory(recv_context_tmp, recv_msg_tmp);
delete recv_msg_tmp;
delete recv_context_tmp;
});
rdma_server_->Recv(
VINEYARD_CHECK_OK(rdma_server_->Recv(
recv_context->rdma_conn_id, reinterpret_cast<void*>(recv_msg),
sizeof(VineyardMsg), reinterpret_cast<void*>(recv_context));
sizeof(VineyardMsg), reinterpret_cast<void*>(recv_context)));
} else if (recv_msg->type == VINEYARD_MSG_EMPTY) {
boost::asio::post(vs_ptr_->GetIOContext(),
[this, recv_context_tmp, recv_msg_tmp] {
doNothing(recv_context_tmp);
delete recv_msg_tmp;
delete recv_context_tmp;
});
rdma_server_->Recv(
VINEYARD_CHECK_OK(rdma_server_->Recv(
recv_context->rdma_conn_id, reinterpret_cast<void*>(recv_msg),
sizeof(VineyardMsg), reinterpret_cast<void*>(recv_context));
sizeof(VineyardMsg), reinterpret_cast<void*>(recv_context)));
} else {
LOG(ERROR) << "Unknown message type: " << recv_msg->type;
rdma_server_->Recv(
VINEYARD_CHECK_OK(rdma_server_->Recv(
recv_context->rdma_conn_id, reinterpret_cast<void*>(recv_msg),
sizeof(VineyardMsg), reinterpret_cast<void*>(recv_context));
sizeof(VineyardMsg), reinterpret_cast<void*>(recv_context)));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/server/vineyard_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,7 @@ void VineyardServer::Stop() {
this->bulk_store_.reset();
this->plasma_bulk_store_.reset();

RDMAClientCreator::Clear();
VINEYARD_DISCARD(RDMAClientCreator::Clear());
}

bool VineyardServer::Running() const { return !stopped_.load(); }
Expand Down
5 changes: 3 additions & 2 deletions src/server/server/vineyard_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,9 @@ class VineyardServer : public std::enable_shared_from_this<VineyardServer> {
}
pendding_to_delete_objects_.clear();
}
this->DelData(ids, false, false, false, false,
[](const Status& status) { return Status::OK(); });
VINEYARD_DISCARD(
this->DelData(ids, false, false, false, false,
[](const Status& status) { return Status::OK(); }));
}

void AddPendingObjects(std::vector<ObjectID> const& ids) {
Expand Down
9 changes: 5 additions & 4 deletions src/server/util/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ Status RemoteClient::Connect(const std::string& rpc_endpoint,

Status RemoteClient::RDMARequestMemInfo(RegisterMemInfo& remote_info) {
void* buffer;
this->rdma_client_->GetTXFreeMsgBuffer(buffer);
VINEYARD_DISCARD(this->rdma_client_->GetTXFreeMsgBuffer(buffer));
VineyardMsg* msg = reinterpret_cast<VineyardMsg*>(buffer);
msg->type = VINEYARD_MSG_REQUEST_MEM;
msg->remoteMemInfo.remote_address = (uint64_t) remote_info.address;
msg->remoteMemInfo.len = remote_info.size;
VLOG(100) << "Request remote addr: "
<< reinterpret_cast<void*>(msg->remoteMemInfo.remote_address);
void* remoteMsg;
this->rdma_client_->GetRXFreeMsgBuffer(remoteMsg);
VINEYARD_DISCARD(this->rdma_client_->GetRXFreeMsgBuffer(remoteMsg));
memset(remoteMsg, 0, 64);
VINEYARD_CHECK_OK(
this->rdma_client_->Recv(remoteMsg, sizeof(VineyardMsg), nullptr));
Expand All @@ -142,7 +142,7 @@ Status RemoteClient::RDMARequestMemInfo(RegisterMemInfo& remote_info) {

Status RemoteClient::RDMAReleaseMemInfo(RegisterMemInfo& remote_info) {
void* buffer;
this->rdma_client_->GetTXFreeMsgBuffer(buffer);
VINEYARD_DISCARD(this->rdma_client_->GetTXFreeMsgBuffer(buffer));
VineyardMsg* msg = reinterpret_cast<VineyardMsg*>(buffer);
msg->type = VINEYARD_MSG_RELEASE_MEM;
msg->remoteMemInfo.remote_address = (uint64_t) remote_info.address;
Expand All @@ -151,7 +151,8 @@ Status RemoteClient::RDMAReleaseMemInfo(RegisterMemInfo& remote_info) {
<< reinterpret_cast<void*>(msg->remoteMemInfo.remote_address)
<< ", rkey: " << msg->remoteMemInfo.key;

this->rdma_client_->Send(buffer, sizeof(VineyardMsg), nullptr);
RETURN_ON_ERROR(
this->rdma_client_->Send(buffer, sizeof(VineyardMsg), nullptr));
VINEYARD_CHECK_OK(rdma_client_->GetTXCompletion(-1, nullptr));

return Status::OK();
Expand Down

0 comments on commit 04b300f

Please sign in to comment.