Skip to content

Commit

Permalink
add hard-code deadline for refelection stream.
Browse files Browse the repository at this point in the history
Signed-off-by: Rahman Abber Tahir <[email protected]>
  • Loading branch information
Rahman Abber Tahir committed Nov 22, 2023
1 parent 2042791 commit 42436ae
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 27 deletions.
9 changes: 3 additions & 6 deletions src/libCli/Call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,11 @@ namespace cli
}

//before calling the RPC, close the DescDb connection with a timeout.
grpc::Status dbDescStatus = ConnectionManager::getInstance().closeDescDbWithDeadline(serverAddress, deadline);
grpc::Status dbDescStatus = ConnectionManager::getInstance().closeDescDbStream(serverAddress);
if (not dbDescStatus.ok())
{
std::cerr << "Failed to close reflection stream ;( Status code: " << std::to_string(dbDescStatus.error_code()) << " " << cli::getGrpcStatusCodeAsString(dbDescStatus.error_code()) << ", error message: " << dbDescStatus.error_message() << std::endl;
if(dbDescStatus.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED)
{
std::cerr << "Note: You can increase the deadline by setting the --rpcTimeoutMilliseconds option to a number or 'None'." << std::endl;
}
std::cerr << "Failed to close reflection stream ;( Pls try again." << std::endl;
std::cerr << "Status code: " << std::to_string(dbDescStatus.error_code()) << " " << cli::getGrpcStatusCodeAsString(dbDescStatus.error_code()) << ", error message: " << dbDescStatus.error_message() << std::endl;
return -1;
}

Expand Down
5 changes: 2 additions & 3 deletions src/libCli/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ namespace cli
return m_connections[f_serverAddress].descPool;
}

grpc::Status ConnectionManager::closeDescDbWithDeadline(std::string f_serverAddress,
std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
grpc::Status ConnectionManager::closeDescDbStream(std::string f_serverAddress)
{
if (m_connections[f_serverAddress].descDbProxy == nullptr)
{
Expand All @@ -75,7 +74,7 @@ namespace cli
}

//if proxy exists close the stream with a deadline.
grpc::Status status = m_connections[f_serverAddress].descDbProxy->closeDescDbStream(deadline);
grpc::Status status = m_connections[f_serverAddress].descDbProxy->closeDescDbStream();

//delete the proxy, findChannelByAddress() protects from accessing uninitialzed DbProxy.
m_connections[f_serverAddress].descDbProxy.reset();
Expand Down
6 changes: 2 additions & 4 deletions src/libCli/libCli/ConnectionManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,11 @@ namespace cli
/// @returns the gRpc DescriptorPool of the corresponding server address.
std::shared_ptr<grpc::protobuf::DescriptorPool> getDescPool(std::string f_serverAddress, ArgParse::ParsedElement &f_parseTree);

/// @brief closes the DescDb stream with a given deadline.
/// @brief closes the DescDb stream with a default deadline.
/// @param f_serverAddress server addresss to lookup the assigned DescDbProxy.
/// @param deadline optional dealine for closing the stream.
/// @return returns grpc::StatusCode::ABORTED status if no DescDb proxy is attached to the server address,
/// otherwise grpc status as a result of stream closure.
grpc::Status closeDescDbWithDeadline(std::string f_serverAddress,
std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);
grpc::Status closeDescDbStream(std::string f_serverAddress);

private:
ConnectionManager() {}
Expand Down
5 changes: 2 additions & 3 deletions src/libLocalDescriptorCache/DescDbProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,21 +372,20 @@ void DescDbProxy::getDescriptors(const std::string &f_hostAddress)
}
}

grpc::Status DescDbProxy::closeDescDbStream(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
grpc::Status DescDbProxy::closeDescDbStream()
{
if ( m_reflectionDescDb == nullptr )
{
return grpc::Status::OK;
}
return m_reflectionDescDb->closeStreamWithDeadline(deadline);
return m_reflectionDescDb->closeDescDbStream();
}

DescDbProxy::DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr<grpc::Channel> channel,
ArgParse::ParsedElement &parseTree)
{
m_channel = channel;
m_parseTree = parseTree;
m_disableCache = disableCache;
if(disableCache)
{
// Get Desc directly via reflection and without touching localDB
Expand Down
5 changes: 2 additions & 3 deletions src/libLocalDescriptorCache/DescDbProxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ class DescDbProxy : public grpc::protobuf::DescriptorDatabase{
/// @param hostAdress Address to the current host
void getDescriptors(const std::string &hostAddress);

/// @brief close the DescDb stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely.
/// @param deadline optional deadline to close the DescDb stream.
/// @brief close the DescDb stream with a default deadline.
/// @return return grpc status as a result of call the finish() on the DescDb stream.
grpc::Status closeDescDbStream(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);
grpc::Status closeDescDbStream();

DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr<grpc::Channel> channel, ArgParse::ParsedElement &parseTree);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase {
// Provide a list of full names of registered services
bool GetServices(std::vector<grpc::string>* output);

/// @brief close the reflection stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely.
/// @param deadline optional deadline to close the reflection stream.
/// @brief close the reflection stream with a default deadline.
/// @return return grpc status as a result of call the finish() on the reflection stream.
grpc::Status closeStreamWithDeadline(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline);
grpc::Status closeDescDbStream();

private:
typedef ClientReaderWriter<
Expand Down
14 changes: 9 additions & 5 deletions third_party/gRPC_utils/proto_reflection_descriptor_database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ using grpc::reflection::v1alpha::ServerReflection;
using grpc::reflection::v1alpha::ServerReflectionRequest;
using grpc::reflection::v1alpha::ServerReflectionResponse;

const uint8_t g_timeoutGrpcMainStreamSeconds = 10; //using default gwhisper timeout of 10 seconds.
namespace grpc {

ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase(
Expand Down Expand Up @@ -300,6 +301,9 @@ void ProtoReflectionDescriptorDatabase::AddFileFromResponse(
const std::shared_ptr<ProtoReflectionDescriptorDatabase::ClientStream>
ProtoReflectionDescriptorDatabase::GetStream() {
if (!stream_) {
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::seconds(g_timeoutGrpcMainStreamSeconds);
ctx_.set_deadline(deadline);
stream_ = stub_->ServerReflectionInfo(&ctx_);
}
return stream_;
Expand All @@ -317,16 +321,13 @@ bool ProtoReflectionDescriptorDatabase::DoOneRequest(
return success;
}

grpc::Status ProtoReflectionDescriptorDatabase::closeStreamWithDeadline(std::optional<std::chrono::time_point<std::chrono::system_clock>> deadline)
grpc::Status ProtoReflectionDescriptorDatabase::closeDescDbStream()
{
stream_mutex_.lock();
if( deadline != std::nullopt )
{
ctx_.set_deadline(deadline.value());
}

auto status = closeStream();
stream_.reset();

stream_mutex_.unlock();
return status;
}
Expand All @@ -342,6 +343,9 @@ grpc::Status ProtoReflectionDescriptorDatabase::closeStream()
fprintf(stderr,
"Reflection request not implemented; "
"is the ServerReflection service enabled?\n");
} else if (status.error_code() == StatusCode::DEADLINE_EXCEEDED) {
fprintf(stderr,
"ServerReflectionInfo rpc failed. Grpc Server failed to close the stream within %d seconds.\n", g_timeoutGrpcMainStreamSeconds);
} else {
fprintf(stderr,
"ServerReflectionInfo rpc failed. Error code: %d, message: %s, "
Expand Down

0 comments on commit 42436ae

Please sign in to comment.