From a92b08841ce9ed3cd94174ac142fec7b08bfebed Mon Sep 17 00:00:00 2001 From: weimch Date: Fri, 5 Jan 2024 10:48:51 +0800 Subject: [PATCH] BugFix: fix trpc stream crash at client when connect failed with high frequency --- .../trpc_client_stream_connection_handler.cc | 16 +++++++++------- .../trpc/trpc_client_stream_connection_handler.h | 4 +++- trpc/stream/trpc/trpc_client_stream_handler.cc | 4 ++++ trpc/stream/trpc/trpc_server_stream_handler.cc | 4 ++++ .../fiber_tcp_conn_complex_connector.cc | 9 +++++---- 5 files changed, 25 insertions(+), 12 deletions(-) diff --git a/trpc/stream/trpc/trpc_client_stream_connection_handler.cc b/trpc/stream/trpc/trpc_client_stream_connection_handler.cc index dddc27ed..dd5b74a0 100644 --- a/trpc/stream/trpc/trpc_client_stream_connection_handler.cc +++ b/trpc/stream/trpc/trpc_client_stream_connection_handler.cc @@ -26,13 +26,8 @@ void FiberTrpcClientStreamConnectionHandler::Init() { // obtained from the factory to prevent repeated judgments and obtain non-NULL values later on. client_codec_ = ClientCodecFactory::GetInstance()->Get(GetTransInfo()->protocol); TRPC_ASSERT(client_codec_ && "trpc client codec not registered"); -} -StreamHandlerPtr FiberTrpcClientStreamConnectionHandler::GetOrCreateStreamHandler() { - std::unique_lock _(mutex_); - // The `stream_handler` only needs to be created when a client calls the stream interface on this connection for the - // first time. Whether the `stream_handler` is assigned can serve as a flag for distinguishing between stream and - // unary calls. + // Create stream handler if (!stream_handler_) { StreamOptions options; options.server_mode = false; @@ -46,12 +41,19 @@ StreamHandlerPtr FiberTrpcClientStreamConnectionHandler::GetOrCreateStreamHandle stream_handler_ = ClientStreamHandlerFactory::GetInstance()->Create(GetTransInfo()->protocol, std::move(options)); stream_handler_->Init(); } +} + +StreamHandlerPtr FiberTrpcClientStreamConnectionHandler::GetOrCreateStreamHandler() { + // When using fiber,stream_handler need to be created before. + // Because with network connecting failed frequently, Stop/Join of this connection handler may be invoking before + // GetOrCreateStreamHandler. This abnormal situation will cause Stop/Join of stream_handler not being invoked. + use_stream_ = true; return stream_handler_; } bool FiberTrpcClientStreamConnectionHandler::HandleMessage(const ConnectionPtr& conn, std::deque& rsp_list) { // Unary response. - if (!stream_handler_) { + if (!use_stream_) { return FiberClientConnectionHandler::HandleMessage(conn, rsp_list); } diff --git a/trpc/stream/trpc/trpc_client_stream_connection_handler.h b/trpc/stream/trpc/trpc_client_stream_connection_handler.h index 33e3fe71..6cb66bab 100644 --- a/trpc/stream/trpc/trpc_client_stream_connection_handler.h +++ b/trpc/stream/trpc/trpc_client_stream_connection_handler.h @@ -46,7 +46,9 @@ class FiberTrpcClientStreamConnectionHandler : public FiberClientStreamConnectio /// Used to distinguish between streaming and non-streaming packets in mixed packet scenarios. ClientCodecPtr client_codec_{nullptr}; StreamHandlerPtr stream_handler_{nullptr}; - FiberMutex mutex_; + /// Flag used to avoids connection MessageHandle doing stream frame distrubution when stream is not needed. + /// Default to false, true if GetOrCreateStreamHandler being invoked. + bool use_stream_{false}; }; /// @brief Implementation of stream connection handler for tRPC client stream which works in [SEPARATE/MERGE] thread diff --git a/trpc/stream/trpc/trpc_client_stream_handler.cc b/trpc/stream/trpc/trpc_client_stream_handler.cc index be72c2b6..e9811d7d 100644 --- a/trpc/stream/trpc/trpc_client_stream_handler.cc +++ b/trpc/stream/trpc/trpc_client_stream_handler.cc @@ -34,6 +34,10 @@ StreamReaderWriterProviderPtr TrpcClientStreamHandler::CreateStream(StreamOption stream->SetFilterController(&filter_controller_); return CriticalSection>([this, stream_id, stream]() { + if (conn_closed_) { + TRPC_LOG_ERROR("connection will be closed, reject creation of new stream: " << stream_id); + return RefPtr(); + } auto found = streams_.find(stream_id); if (found != streams_.end()) { TRPC_LOG_ERROR("stream " << stream_id << " already exists."); diff --git a/trpc/stream/trpc/trpc_server_stream_handler.cc b/trpc/stream/trpc/trpc_server_stream_handler.cc index 08cd5e17..36fabbd8 100644 --- a/trpc/stream/trpc/trpc_server_stream_handler.cc +++ b/trpc/stream/trpc/trpc_server_stream_handler.cc @@ -40,6 +40,10 @@ StreamReaderWriterProviderPtr TrpcServerStreamHandler::CreateStream(StreamOption context->SetStreamReaderWriterProvider(stream); return CriticalSection>([this, stream_id, stream]() { + if (conn_closed_) { + TRPC_LOG_ERROR("connection will be closed, reject creation of new stream: " << stream_id); + return RefPtr(); + } auto found = streams_.find(stream_id); if (found != streams_.end()) { TRPC_LOG_ERROR("stream " << stream_id << " already exists."); diff --git a/trpc/transport/client/fiber/conn_complex/fiber_tcp_conn_complex_connector.cc b/trpc/transport/client/fiber/conn_complex/fiber_tcp_conn_complex_connector.cc index 75f40218..61423cff 100644 --- a/trpc/transport/client/fiber/conn_complex/fiber_tcp_conn_complex_connector.cc +++ b/trpc/transport/client/fiber/conn_complex/fiber_tcp_conn_complex_connector.cc @@ -134,10 +134,11 @@ void FiberTcpConnComplexConnector::ConnectionCleanFunction(Connection* conn) { SetHealthy(false); RefPtr connector(ref_ptr, this); - bool flag = options_.connector_group->DelConnector(this); - if (!flag) { - return; - } + + // When connector becomes unhealthy, it may be deleted by another thread at GetOrCreate in ConnectorGroup. + // DelConnector invoking here may fail, but still need ClearResource. + // The atomic variable cleanup_ will keep ClearResource being invoked only once. + options_.connector_group->DelConnector(this); if (cleanup_.exchange(true)) { return;