diff --git a/src/libCli/Call.cpp b/src/libCli/Call.cpp index e5dbef8..287b68f 100644 --- a/src/libCli/Call.cpp +++ b/src/libCli/Call.cpp @@ -189,7 +189,19 @@ namespace cli deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(customTimeoutMs); } } - + + //before calling the RPC, close the DescDb connection with a timeout. + grpc::Status dbDescStatus = ConnectionManager::getInstance().closeDescDbWithDeadline(serverAddress, deadline); + if (not dbDescStatus.ok()) + { + std::cerr << "Failed to close reflection stream ;( Status code: " << std::to_string(dbDescStatus.error_code()) << " " << cli::getGrpcStatusCodeAsString(dbDescStatus.error_code()) << ", error message: " << dbDescStatus.error_message() << std::endl; + if(dbDescStatus.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED) + { + std::cerr << "Note: You can increase the deadline by setting the --rpcTimeoutMilliseconds option to a number or 'None'." << std::endl; + } + return -1; + } + grpc::testing::CliCall call(channel, methodStr, clientMetadata, deadline); auto messageFormatter = createMessageFormatter(parseTree); diff --git a/src/libCli/ConnectionManager.cpp b/src/libCli/ConnectionManager.cpp index 74b8732..5e1c084 100644 --- a/src/libCli/ConnectionManager.cpp +++ b/src/libCli/ConnectionManager.cpp @@ -65,6 +65,24 @@ namespace cli return m_connections[f_serverAddress].descPool; } + grpc::Status ConnectionManager::closeDescDbWithDeadline(std::string f_serverAddress, + std::optional> deadline) + { + if (m_connections[f_serverAddress].descDbProxy == nullptr) + { + std::cerr << "Error: Unable to close DescDb connection!" << std::endl; + return grpc::Status( grpc::StatusCode::ABORTED, "descDbProxy has not been initialized."); + } + + //if proxy exists close the stream with a deadline. + grpc::Status status = m_connections[f_serverAddress].descDbProxy->closeDescDbStream(deadline); + + //delete the proxy, findChannelByAddress() protects from accessing uninitialzed DbProxy. + m_connections[f_serverAddress].descDbProxy.reset(); + + return status; + } + void ConnectionManager::ensureDescDbProxyAndDescPoolIsAvailable(std::string &f_serverAddress, ArgParse::ParsedElement &f_parseTree) { if (m_connections[f_serverAddress].channel) diff --git a/src/libCli/libCli/ConnectionManager.hpp b/src/libCli/libCli/ConnectionManager.hpp index a951e5d..2515af3 100644 --- a/src/libCli/libCli/ConnectionManager.hpp +++ b/src/libCli/libCli/ConnectionManager.hpp @@ -52,6 +52,14 @@ namespace cli /// @returns the gRpc DescriptorPool of the corresponding server address. std::shared_ptr getDescPool(std::string f_serverAddress, ArgParse::ParsedElement &f_parseTree); + /// @brief closes the DescDb stream with a given deadline. + /// @param f_serverAddress server addresss to lookup the assigned DescDbProxy. + /// @param deadline optional dealine for closing the stream. + /// @return returns grpc::StatusCode::ABORTED status if no DescDb proxy is attached to the server address, + /// otherwise grpc status as a result of stream closure. + grpc::Status closeDescDbWithDeadline(std::string f_serverAddress, + std::optional> deadline); + private: ConnectionManager() {} ~ConnectionManager() {} diff --git a/src/libLocalDescriptorCache/DescDbProxy.cpp b/src/libLocalDescriptorCache/DescDbProxy.cpp index a8e25ed..e274398 100644 --- a/src/libLocalDescriptorCache/DescDbProxy.cpp +++ b/src/libLocalDescriptorCache/DescDbProxy.cpp @@ -350,12 +350,21 @@ void DescDbProxy::getDescriptors(const std::string &f_hostAddress) } } +grpc::Status DescDbProxy::closeDescDbStream(std::optional> deadline) +{ + if ( m_reflectionDescDb == nullptr ) + { + return grpc::Status::OK; + } + return m_reflectionDescDb->closeStreamWithDeadline(deadline); +} + DescDbProxy::DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr channel, ArgParse::ParsedElement &parseTree) { m_channel = channel; m_parseTree = parseTree; - + m_disableCache = disableCache; if(disableCache) { // Get Desc directly via reflection and without touching localDB diff --git a/src/libLocalDescriptorCache/DescDbProxy.hpp b/src/libLocalDescriptorCache/DescDbProxy.hpp index 7d188c5..f8652a9 100644 --- a/src/libLocalDescriptorCache/DescDbProxy.hpp +++ b/src/libLocalDescriptorCache/DescDbProxy.hpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -56,6 +57,11 @@ class DescDbProxy : public grpc::protobuf::DescriptorDatabase{ /// Stores DescDB acquired via sever reflection locally as a DB file in proto3 structure. /// @param hostAdress Address to the current host void getDescriptors(const std::string &hostAddress); + + /// @brief close the DescDb stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely. + /// @param deadline optional deadline to close the DescDb stream. + /// @return return grpc status as a result of call the finish() on the DescDb stream. + grpc::Status closeDescDbStream(std::optional> deadline); DescDbProxy(bool disableCache, const std::string &hostAddress, std::shared_ptr channel, ArgParse::ParsedElement &parseTree); @@ -112,5 +118,6 @@ class DescDbProxy : public grpc::protobuf::DescriptorDatabase{ std::vectorm_descList; std::set m_descNames; std::vector m_serviceList; + bool m_disableCache; }; diff --git a/third_party/gRPC_utils/gRPC_utils/proto_reflection_descriptor_database.h b/third_party/gRPC_utils/gRPC_utils/proto_reflection_descriptor_database.h index 319432e..41cdf22 100644 --- a/third_party/gRPC_utils/gRPC_utils/proto_reflection_descriptor_database.h +++ b/third_party/gRPC_utils/gRPC_utils/proto_reflection_descriptor_database.h @@ -28,6 +28,8 @@ // MODIFIED by IBM (Rainer Schoenberger) // original: #include "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h" // MODIFIED by IBM (Fabian Pfeifroth-Brumm) +// MODIFIED by IBM (Rahman Abber Tahir) +#include #include "reflection.grpc.pb.h" // END MODIFIED @@ -80,6 +82,11 @@ class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase { // Provide a list of full names of registered services bool GetServices(std::vector* output); + /// @brief close the reflection stream with a given deadline. If the dealine is not set it waits for the stream to close indefinitely. + /// @param deadline optional deadline to close the reflection stream. + /// @return return grpc status as a result of call the finish() on the reflection stream. + grpc::Status closeStreamWithDeadline(std::optional> deadline); + private: typedef ClientReaderWriter< grpc::reflection::v1alpha::ServerReflectionRequest, @@ -98,6 +105,8 @@ class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase { const grpc::reflection::v1alpha::ServerReflectionRequest& request, grpc::reflection::v1alpha::ServerReflectionResponse& response); + grpc::Status closeStream(); + std::shared_ptr stream_; grpc::ClientContext ctx_; std::unique_ptr stub_; diff --git a/third_party/gRPC_utils/proto_reflection_descriptor_database.cc b/third_party/gRPC_utils/proto_reflection_descriptor_database.cc index 9cab5a0..ed374c0 100644 --- a/third_party/gRPC_utils/proto_reflection_descriptor_database.cc +++ b/third_party/gRPC_utils/proto_reflection_descriptor_database.cc @@ -19,6 +19,7 @@ // MODIFIED by IBM (Rainer Schoenberger) // original: #include "test/cpp/util/proto_reflection_descriptor_database.h" #include "proto_reflection_descriptor_database.h" +// MODIFIED by IBM (Rahman Abber Tahir) // END MODIFIED #include @@ -42,24 +43,7 @@ ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase( : stub_(ServerReflection::NewStub(channel)) {} ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() { - if (stream_) { - stream_->WritesDone(); - Status status = stream_->Finish(); - if (!status.ok()) { - if (status.error_code() == StatusCode::UNIMPLEMENTED) { - fprintf(stderr, - "Reflection request not implemented; " - "is the ServerReflection service enabled?\n"); - } else { - fprintf(stderr, - "ServerReflectionInfo rpc failed. Error code: %d, message: %s, " - "debug info: %s\n", - static_cast(status.error_code()), - status.error_message().c_str(), - ctx_.debug_error_string().c_str()); - } - } - } + closeStream(); } bool ProtoReflectionDescriptorDatabase::FindFileByName( @@ -333,4 +317,42 @@ bool ProtoReflectionDescriptorDatabase::DoOneRequest( return success; } +grpc::Status ProtoReflectionDescriptorDatabase::closeStreamWithDeadline(std::optional> deadline) +{ + stream_mutex_.lock(); + if( deadline != std::nullopt ) + { + ctx_.set_deadline(deadline.value()); + } + + auto status = closeStream(); + stream_.reset(); + stream_mutex_.unlock(); + return status; +} + +grpc::Status ProtoReflectionDescriptorDatabase::closeStream() +{ + Status status; + if (stream_) { + stream_->WritesDone(); + status = stream_->Finish(); + if (!status.ok()) { + if (status.error_code() == StatusCode::UNIMPLEMENTED) { + fprintf(stderr, + "Reflection request not implemented; " + "is the ServerReflection service enabled?\n"); + } else { + fprintf(stderr, + "ServerReflectionInfo rpc failed. Error code: %d, message: %s, " + "debug info: %s\n", + static_cast(status.error_code()), + status.error_message().c_str(), + ctx_.debug_error_string().c_str()); + } + } + } + return status; +} + } // namespace grpc