Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixGwhisperHangOnDescDbStreamClose #147

Merged
merged 5 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/libCli/Call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,19 @@ namespace cli
deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(customTimeoutMs);
}
}


//before calling the RPC, close the DescDb connection with a timeout.
grpc::Status dbDescStatus = ConnectionManager::getInstance().closeDescDbWithDeadline(serverAddress, deadline);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was about to suggest moving all the closing into destructors and just drop the descriptor database / proxy when we do not need it any more.
However the ConnectionManager singleton makes life hard here. gWhisper does not have the most beautiful design :-(
So I am fine with it as is. what do you think?

Copy link
Author

@abb3r abb3r Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah ConnectionManager actually makes it hard. since we are also using shared ptrs here we would not know how many owners are there for the proxy and when will it actually closes.
Doing explicitly allows us to check the grpc status/result from calling the stream finish() aswell.

if (not dbDescStatus.ok())
{
std::cerr << "Failed to close reflection stream ;( Status code: " << std::to_string(dbDescStatus.error_code()) << " " << cli::getGrpcStatusCodeAsString(dbDescStatus.error_code()) << ", error message: " << dbDescStatus.error_message() << std::endl;
if(dbDescStatus.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED)
{
std::cerr << "Note: You can increase the deadline by setting the --rpcTimeoutMilliseconds option to a number or 'None'." << std::endl;
}
return -1;
}

grpc::testing::CliCall call(channel, methodStr, clientMetadata, deadline);

auto messageFormatter = createMessageFormatter(parseTree);
Expand Down
18 changes: 18 additions & 0 deletions src/libCli/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ namespace cli
return m_connections[f_serverAddress].descPool;
}

grpc::Status ConnectionManager::closeDescDbWithDeadline(std::string f_serverAddress,
std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
{
if (m_connections[f_serverAddress].descDbProxy == nullptr)
{
std::cerr << "Error: Unable to close DescDb connection!" << std::endl;
return grpc::Status( grpc::StatusCode::ABORTED, "descDbProxy has not been initialized.");
}

//if proxy exists close the stream with a deadline.
grpc::Status status = m_connections[f_serverAddress].descDbProxy->closeDescDbStream(deadline);

//delete the proxy, findChannelByAddress() protects from accessing uninitialzed DbProxy.
m_connections[f_serverAddress].descDbProxy.reset();

return status;
}

void ConnectionManager::ensureDescDbProxyAndDescPoolIsAvailable(std::string &f_serverAddress, ArgParse::ParsedElement &f_parseTree)
{
if (m_connections[f_serverAddress].channel)
Expand Down
8 changes: 8 additions & 0 deletions src/libCli/libCli/ConnectionManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ namespace cli
/// @returns the gRpc DescriptorPool of the corresponding server address.
std::shared_ptr<grpc::protobuf::DescriptorPool> getDescPool(std::string f_serverAddress, ArgParse::ParsedElement &f_parseTree);

/// @brief closes the DescDb stream with a given deadline.
/// @param f_serverAddress server addresss to lookup the assigned DescDbProxy.
/// @param deadline optional dealine for closing the stream.
/// @return returns grpc::StatusCode::ABORTED status if no DescDb proxy is attached to the server address,
/// otherwise grpc status as a result of stream closure.
grpc::Status closeDescDbWithDeadline(std::string f_serverAddress,
std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);

private:
ConnectionManager() {}
~ConnectionManager() {}
Expand Down
16 changes: 15 additions & 1 deletion src/libLocalDescriptorCache/DescDbProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,26 @@ void DescDbProxy::getDescriptors(const std::string &f_hostAddress)
}
}

grpc::Status DescDbProxy::closeDescDbStream(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
{
if ( m_reflectionDescDb == nullptr )
{
if( m_disableCache == false )//cache enabled, no reflection stream required.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about just remove the check and return once m_reflectionDescDb == nullptr yields true? (meaning replace whe whole outer if body with a return;)

Sotty github woudn't let me write a suggestion for this.

Copy link
Author

@abb3r abb3r Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, the unique ptr == null garauntees the stream is already closed. that is actually a valid case.

{
return grpc::Status::OK;
}
std::cerr << "Exit - no reflectionDescDb initialized." <<std::endl;
exit(EXIT_FAILURE);
}
return m_reflectionDescDb->closeStreamWithDeadline(deadline);
}

DescDbProxy::DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr<grpc::Channel> channel,
ArgParse::ParsedElement &parseTree)
{
m_channel = channel;
m_parseTree = parseTree;

m_disableCache = disableCache;
if(disableCache)
{
// Get Desc directly via reflection and without touching localDB
Expand Down
7 changes: 7 additions & 0 deletions src/libLocalDescriptorCache/DescDbProxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <string>
#include <time.h>
#include <set>
#include <optional>

#include <grpcpp/grpcpp.h>
#include <gRPC_utils/proto_reflection_descriptor_database.h>
Expand Down Expand Up @@ -56,6 +57,11 @@ class DescDbProxy : public grpc::protobuf::DescriptorDatabase{
/// Stores DescDB acquired via sever reflection locally as a DB file in proto3 structure.
/// @param hostAdress Address to the current host
void getDescriptors(const std::string &hostAddress);

/// @brief close the DescDb stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely.
/// @param deadline optional deadline to close the DescDb stream.
/// @return return grpc status as a result of call the finish() on the DescDb stream.
grpc::Status closeDescDbStream(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);

DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr<grpc::Channel> channel, ArgParse::ParsedElement &parseTree);

Expand Down Expand Up @@ -112,5 +118,6 @@ class DescDbProxy : public grpc::protobuf::DescriptorDatabase{
std::vector<const grpc::protobuf::FileDescriptor*>m_descList;
std::set<std::string> m_descNames;
std::vector<grpc::string> m_serviceList;
bool m_disableCache;
};

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
// MODIFIED by IBM (Rainer Schoenberger)
// original: #include "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h"
// MODIFIED by IBM (Fabian Pfeifroth-Brumm)
// MODIFIED by IBM (Rahman Abber Tahir)
#include <optional>
#include "reflection.grpc.pb.h"
// END MODIFIED

Expand Down Expand Up @@ -80,6 +82,11 @@ class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase {
// Provide a list of full names of registered services
bool GetServices(std::vector<grpc::string>* output);

/// @brief close the reflection stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely.
/// @param deadline optional deadline to close the reflection stream.
/// @return return grpc status as a result of call the finish() on the reflection stream.
grpc::Status closeStreamWithDeadline(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);

private:
typedef ClientReaderWriter<
grpc::reflection::v1alpha::ServerReflectionRequest,
Expand All @@ -98,6 +105,8 @@ class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase {
const grpc::reflection::v1alpha::ServerReflectionRequest& request,
grpc::reflection::v1alpha::ServerReflectionResponse& response);

grpc::Status closeStream();

std::shared_ptr<ClientStream> stream_;
grpc::ClientContext ctx_;
std::unique_ptr<grpc::reflection::v1alpha::ServerReflection::Stub> stub_;
Expand Down
58 changes: 40 additions & 18 deletions third_party/gRPC_utils/proto_reflection_descriptor_database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// MODIFIED by IBM (Rainer Schoenberger)
// original: #include "test/cpp/util/proto_reflection_descriptor_database.h"
#include "proto_reflection_descriptor_database.h"
// MODIFIED by IBM (Rahman Abber Tahir)
// END MODIFIED

#include <vector>
Expand All @@ -42,24 +43,7 @@ ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase(
: stub_(ServerReflection::NewStub(channel)) {}

ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() {
if (stream_) {
stream_->WritesDone();
Status status = stream_->Finish();
if (!status.ok()) {
if (status.error_code() == StatusCode::UNIMPLEMENTED) {
fprintf(stderr,
"Reflection request not implemented; "
"is the ServerReflection service enabled?\n");
} else {
fprintf(stderr,
"ServerReflectionInfo rpc failed. Error code: %d, message: %s, "
"debug info: %s\n",
static_cast<int>(status.error_code()),
status.error_message().c_str(),
ctx_.debug_error_string().c_str());
}
}
}
closeStream();
}

bool ProtoReflectionDescriptorDatabase::FindFileByName(
Expand Down Expand Up @@ -333,4 +317,42 @@ bool ProtoReflectionDescriptorDatabase::DoOneRequest(
return success;
}

grpc::Status ProtoReflectionDescriptorDatabase::closeStreamWithDeadline(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
{
stream_mutex_.lock();
if( deadline != std::nullopt )
{
ctx_.set_deadline(deadline.value());
}

auto status = closeStream();
stream_.reset();
stream_mutex_.unlock();
return status;
}

grpc::Status ProtoReflectionDescriptorDatabase::closeStream()
{
Status status;
if (stream_) {
stream_->WritesDone();
status = stream_->Finish();
if (!status.ok()) {
if (status.error_code() == StatusCode::UNIMPLEMENTED) {
fprintf(stderr,
"Reflection request not implemented; "
"is the ServerReflection service enabled?\n");
} else {
fprintf(stderr,
"ServerReflectionInfo rpc failed. Error code: %d, message: %s, "
"debug info: %s\n",
static_cast<int>(status.error_code()),
status.error_message().c_str(),
ctx_.debug_error_string().c_str());
}
}
}
return status;
}

} // namespace grpc
Loading