diff --git a/src/infer_request.cc b/src/infer_request.cc index 31182281..fc1e4206 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -402,13 +402,6 @@ InferRequest::IsCancelled() std::shared_ptr InferRequest::GetResponseSender() { - std::unique_ptr& stub = Stub::GetOrCreateInstance(); - if (!stub->IsDecoupled()) { - throw PythonBackendException( - "'get_response_sender' function must be called only when the model is " - "using the decoupled transaction policy."); - } - return response_sender_; } diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 56d466f5..0e288e68 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -402,11 +402,7 @@ Stub::RunCommand() shm_pool_->Load(ipc_message->Args()); RequestBatch* request_batch_shm_ptr = reinterpret_cast(request_batch.data_.get()); - if (!ipc_control_->decoupled) { - ProcessRequests(request_batch_shm_ptr); - } else { - ProcessRequestsDecoupled(request_batch_shm_ptr); - } + ProcessRequests(request_batch_shm_ptr); } break; case PYTHONSTUB_CommandType::PYTHONSTUB_FinalizeRequest: @@ -597,18 +593,6 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle) initialized_ = true; } -void -Stub::ProcessResponse(InferResponse* response) -{ - response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */); - - for (auto& output_tensor : response->OutputTensors()) { - if (!output_tensor->IsCPU()) { - gpu_tensors_.push_back(output_tensor); - } - } -} - void Stub::LoadGPUBuffers(std::unique_ptr& ipc_message) { @@ -682,7 +666,7 @@ Stub::LoadRequestsFromSharedMemory(RequestBatch* request_batch_shm_ptr) } void -Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr) +Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) { py::list py_request_list = LoadRequestsFromSharedMemory(request_batch_shm_ptr); @@ -718,18 +702,21 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr) py::object execute_return = model_instance_.attr("execute")(py_request_list); + bool is_coroutine = py::module::import("asyncio") .attr("iscoroutine")(execute_return) .cast(); if (is_coroutine) { - RunCoroutine(execute_return); - } else { - if (!py::isinstance(execute_return)) { - throw PythonBackendException( - "Python model '" + name_ + - "' is using the decoupled mode and the execute function must " - "return None."); + if (IsDecoupled()) { + // Do not wait for async decoupled execute to return. + RunCoroutine(execute_return, true /* in_background */); + } else { + py::object coroutine_return = + RunCoroutine(execute_return, false /* in_background */); + ProcessReturnedResponses(py_request_list, coroutine_return); } + } else { + ProcessReturnedResponses(py_request_list, execute_return); } } } @@ -757,151 +744,60 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr) } void -Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) +Stub::ProcessReturnedResponses( + py::list py_requests, py::object py_responses_obj) { - std::unique_ptr execute_response = - IPCMessage::Create(shm_pool_, false /* Inline response */); - execute_response->Command() = PYTHONSTUB_ExecuteResponse; - - AllocatedSharedMemory response_batch = shm_pool_->Construct( - request_batch_shm_ptr->batch_size * - sizeof(bi::managed_external_buffer::handle_t) + - sizeof(ResponseBatch)); - ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.data_.get()); - - std::unique_ptr error_string_shm; - py::list inference_responses; - - bi::managed_external_buffer::handle_t* responses_shm_handle = - reinterpret_cast( - response_batch.data_.get() + sizeof(ResponseBatch)); - - py::list responses; - - // Notifying the stub should be after responses. - ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); }); - ScopedDefer _( - [this, &execute_response] { SendIPCMessage(execute_response); }); - - execute_response->Args() = response_batch.handle_; - - bool has_exception = false; - std::string error_string; - try { - response_batch_shm_ptr->has_error = false; - response_batch_shm_ptr->is_error_set = false; - - uint32_t batch_size = request_batch_shm_ptr->batch_size; - - if (batch_size == 0) { - return; - } - - py::list py_request_list = - LoadRequestsFromSharedMemory(request_batch_shm_ptr); - - if (!py::hasattr(model_instance_, "execute")) { - std::string message = "Python model " + model_context_.PythonModelPath() + - " does not implement `execute` method."; - throw PythonBackendException(message); - } - - py::object request_list = py_request_list; - py::module asyncio = py::module::import("asyncio"); - - // Execute Response - py::object execute_return; - py::object responses_obj; - bool is_coroutine; - - { - NVTX_RANGE(nvtx_, "PyExecute " + name_); - execute_return = model_instance_.attr("execute")(request_list); - is_coroutine = asyncio.attr("iscoroutine")(execute_return).cast(); - } - - if (is_coroutine) { - responses_obj = asyncio.attr("run")(execute_return); - } else { - responses_obj = execute_return; - } - - // Check the return type of execute function. - if (!py::isinstance(responses_obj)) { - std::string str = py::str(execute_return.get_type()); - throw PythonBackendException( - std::string("Expected a list in the execute return, found type '") + - str + "'."); - } - - responses = responses_obj; - size_t response_size = py::len(responses); - - // If the number of request objects do not match the number of - // response objects throw an error. - if (response_size != batch_size) { - std::string err = - "Number of InferenceResponse objects do not match the number " - "of " - "InferenceRequest objects. InferenceRequest(s) size is:" + - std::to_string(batch_size) + ", and InferenceResponse(s) size is:" + - std::to_string(response_size) + "\n"; - throw PythonBackendException(err); - } - - for (size_t i = 0; i < response_size; i++) { - // Check the return type of execute function. - InferRequest* infer_request = py_request_list[i].cast(); - if (infer_request->ReleaseFlags() == - TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) { - if (!py::isinstance(responses[i])) { - // When the request is rescheduled in non-decoupled model, the - // response must be None. - std::string str = py::str(responses[i].get_type()); - throw PythonBackendException( - "Expected a None object in the execute function return list for " - "reschduled request, " - "found type '" + - str + "'."); - } - } else { - if (!py::isinstance(responses[i])) { - std::string str = py::str(responses[i].get_type()); - throw PythonBackendException( - std::string( - "Expected an 'InferenceResponse' object in the execute " - "function return list, found type '") + - str + "'."); - } - InferResponse* infer_response = responses[i].cast(); - infer_response->PruneOutputTensors( - infer_request->RequestedOutputNames()); - ProcessResponse(infer_response); - responses_shm_handle[i] = infer_response->ShmHandle(); - } - } - response_batch_shm_ptr->batch_size = response_size; + // Return if there is nothing to process. + if (py::isinstance(py_responses_obj)) { + return; } - catch (const PythonBackendException& pb_exception) { - has_exception = true; - error_string = pb_exception.what(); + // Only non-decoupled may return responses. + if (IsDecoupled()) { + throw PythonBackendException( + "Python model '" + name_ + + "' is using the decoupled mode and the execute function must return " + "None."); } - catch (const py::error_already_set& error) { - has_exception = true; - error_string = error.what(); + // Check responses is a list. + if (!py::isinstance(py_responses_obj)) { + throw PythonBackendException( + "Expected a list in the execute return, found type '" + + std::string(py::str(py_responses_obj.get_type())) + "'."); + } + py::list py_responses = py_responses_obj; + // Responses and requests length must match. + size_t requests_size = py::len(py_requests); + size_t responses_size = py::len(py_responses); + if (requests_size != responses_size) { + throw PythonBackendException( + "Number of InferenceResponse objects do not match the number of " + "InferenceRequest objects. InferenceRequest(s) size is:" + + std::to_string(requests_size) + ", and InferenceResponse(s) size is:" + + std::to_string(responses_size) + "\n"); } - if (has_exception) { - std::string err_message = - std::string( - "Failed to process the request(s) for model '" + name_ + - "', message: ") + - error_string; - error_string_shm = PbString::Create(shm_pool_, error_string); - response_batch_shm_ptr->has_error = true; - response_batch_shm_ptr->is_error_set = true; - response_batch_shm_ptr->error = error_string_shm->ShmHandle(); + for (size_t i = 0; i < responses_size; i++) { + if (!py::isinstance(py_responses[i])) { + InferRequest* request = py_requests[i].cast(); + // Response must be None if rescheduled. + if (request->ReleaseFlags() == TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) { + throw PythonBackendException( + "Expected a None object in the execute function return list for " + "reschduled request, found type '" + + std::string(py::str(py_responses[i].get_type())) + "'."); + } + // Send the response. + if (!py::isinstance(py_responses[i])) { + throw PythonBackendException( + "Expected an 'InferenceResponse' object in the execute function " + "return list, found type '" + + std::string(py::str(py_responses[i].get_type())) + "'."); + } + std::shared_ptr response = + py_responses[i].cast>(); + request->GetResponseSender()->Send( + response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); + } } } @@ -923,15 +819,19 @@ Stub::GetAsyncEventLoop() return async_event_loop_; } -void -Stub::RunCoroutine(py::object coroutine) +py::object +Stub::RunCoroutine(py::object coroutine, bool in_background) { py::object loop = GetAsyncEventLoop(); py::object py_future = py::module_::import("asyncio").attr( "run_coroutine_threadsafe")(coroutine, loop); - py_future.attr("add_done_callback")( - py::module_::import("c_python_backend_utils") - .attr("async_event_future_done_callback")); + if (in_background) { + py_future.attr("add_done_callback")( + py::module_::import("c_python_backend_utils") + .attr("async_event_future_done_callback")); + return py::none(); + } + return py_future.attr("result")(); } void diff --git a/src/pb_stub.h b/src/pb_stub.h index c9462fd0..10e7606a 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -253,11 +253,12 @@ class Stub { /// Execute a batch of requests. void ProcessRequests(RequestBatch* request_batch_shm_ptr); - void ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr); + void ProcessReturnedResponses( + py::list py_requests, py::object py_responses_obj); py::object GetAsyncEventLoop(); - void RunCoroutine(py::object coroutine); + py::object RunCoroutine(py::object coroutine, bool in_background); /// Get the memory manager message queue std::unique_ptr>& MemoryManagerQueue(); @@ -265,8 +266,6 @@ class Stub { /// Get the shared memory pool std::unique_ptr& ShmPool() { return shm_pool_; } - void ProcessResponse(InferResponse* response); - void ProcessBLSResponseDecoupled(std::unique_ptr& ipc_message); void LoadGPUBuffers(std::unique_ptr& ipc_message); diff --git a/src/python_be.cc b/src/python_be.cc index b95fb715..ce03adc7 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -153,124 +153,6 @@ ModelInstanceState::SetErrorForResponseSendMessage( } } -void -ModelInstanceState::SendMessageAndReceiveResponse( - bi::managed_external_buffer::handle_t message, - bi::managed_external_buffer::handle_t& response, bool& restart, - std::shared_ptr>& responses, - TRITONBACKEND_Request** requests, const uint32_t request_count) -{ - auto error = SendMessageToStub(message); - if (error != nullptr) { - restart = true; - RespondErrorToAllRequests( - TRITONSERVER_ErrorMessage(error), responses, requests, request_count); - - return; - } - - bi::managed_external_buffer::handle_t response_message; - error = Stub()->ReceiveMessageFromStub(response_message); - if (error != nullptr) { - restart = true; - RespondErrorToAllRequests( - TRITONSERVER_ErrorMessage(error), responses, requests, request_count); - - return; - } - - response = response_message; -} - -TRITONSERVER_Error* -ModelInstanceState::SendMessageToStub( - bi::managed_external_buffer::handle_t message) -{ - bool success = false; - while (!success) { - uint64_t timeout_miliseconds = 1000; - { - boost::posix_time::ptime timeout = - boost::get_system_time() + - boost::posix_time::milliseconds(timeout_miliseconds); - - bi::scoped_lock lock( - *(Stub()->HealthMutex()), timeout); - - // Check if lock has been acquired. - if (lock) { - Stub()->IpcControl()->stub_health = false; - } else { - // If it failed to obtain the lock, it means that the stub has been - // stuck or exited while holding the health mutex lock. - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, "Failed to obtain the health mutex."); - } - } - - Stub()->StubMessageQueue()->Push( - message, timeout_miliseconds /* duration ms */, success); - - if (!success && !IsStubProcessAlive()) { - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, "Stub process is not healthy."); - } - } - - return nullptr; // success -} - -void -ModelInstanceState::RespondErrorToAllRequests( - const char* message, - std::shared_ptr>& responses, - TRITONBACKEND_Request** requests, const uint32_t request_count) -{ - for (uint32_t r = 0; r < request_count; ++r) { - if ((*responses)[r] == nullptr) - continue; - - std::string err_message = - std::string( - "Failed to process the request(s) for model instance '" + Name() + - "', message: ") + - message; - - TRITONSERVER_Error* err = - TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_INTERNAL, err_message.c_str()); - LOG_IF_ERROR( - TRITONBACKEND_ResponseSend( - (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), - "failed sending response"); - - (*responses)[r] = nullptr; - TRITONSERVER_ErrorDelete(err); - } -} - -void -ModelInstanceState::WaitForBLSRequestsToFinish() -{ - futures_.clear(); -} - -bool -ModelInstanceState::IsStubProcessAlive() -{ - boost::posix_time::ptime timeout = - boost::get_system_time() + boost::posix_time::seconds(1); - bi::scoped_lock lock(*Stub()->HealthMutex(), timeout); - - // Check if lock has been acquired. - if (lock) { - return Stub()->IpcControl()->stub_health; - } else { - // If It failed to obtain the lock, it means that the stub has been - // stuck or exited while holding the health mutex lock. - return false; - } -} - TRITONSERVER_Error* ModelInstanceState::SaveRequestsToSharedMemory( TRITONBACKEND_Request** requests, const uint32_t request_count, @@ -408,24 +290,15 @@ ModelInstanceState::SaveRequestsToSharedMemory( request, &request_timeout)); std::unique_ptr infer_request; - if (model_state->IsDecoupled()) { - TRITONBACKEND_ResponseFactory* factory_ptr; - RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); - - infer_request = std::make_unique( - id, correlation_id, pb_input_tensors, requested_output_names, - model_state->Name(), model_state->Version(), parameters_string, flags, - request_timeout, reinterpret_cast(factory_ptr), - reinterpret_cast(request), - PreferredMemory(PreferredMemory::kDefault, 0), trace); - } else { - infer_request = std::make_unique( - id, correlation_id, pb_input_tensors, requested_output_names, - model_state->Name(), model_state->Version(), parameters_string, flags, - request_timeout, 0 /* response_factory_address */, - reinterpret_cast(request), - PreferredMemory(PreferredMemory::kDefault, 0), trace); - } + TRITONBACKEND_ResponseFactory* factory_ptr; + RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); + + infer_request = std::make_unique( + id, correlation_id, pb_input_tensors, requested_output_names, + model_state->Name(), model_state->Version(), parameters_string, flags, + request_timeout, reinterpret_cast(factory_ptr), + reinterpret_cast(request), + PreferredMemory(PreferredMemory::kDefault, 0), trace); RETURN_IF_EXCEPTION(infer_request->SaveToSharedMemory(Stub()->ShmPool())); requests_shm[r] = infer_request->ShmHandle(); pb_infer_requests.emplace_back(std::move(infer_request)); @@ -449,11 +322,9 @@ ModelInstanceState::LaunchStubProcess() thread_pool_ = std::make_unique( model_state->StateForBackend()->thread_pool_size); - if (model_state->IsDecoupled()) { - decoupled_thread_ = true; - decoupled_monitor_ = - std::thread(&ModelInstanceState::DecoupledMessageQueueMonitor, this); - } + decoupled_thread_ = true; + decoupled_monitor_ = + std::thread(&ModelInstanceState::DecoupledMessageQueueMonitor, this); request_executor_ = std::make_unique( Stub()->ShmPool(), model_state->TritonServer()); @@ -1306,7 +1177,7 @@ ModelInstanceState::ResponseSendDecoupled( } TRITONSERVER_Error* -ModelInstanceState::ProcessRequestsDecoupled( +ModelInstanceState::ProcessRequests( TRITONBACKEND_Request** requests, const uint32_t request_count, std::vector>& pb_infer_requests, PbMetricReporter& reporter) @@ -1382,364 +1253,6 @@ ModelInstanceState::ProcessRequestsDecoupled( return nullptr; // success } -void -ModelInstanceState::ProcessRequests( - TRITONBACKEND_Request** requests, const uint32_t request_count, - std::vector>& pb_infer_requests, - bool& restart) -{ - NVTX_RANGE(nvtx_, "ProcessRequests " + Name()); - ModelState* model_state = reinterpret_cast(Model()); - std::string name = model_state->Name(); - - LOG_MESSAGE( - TRITONSERVER_LOG_VERBOSE, - (std::string("model ") + model_state->Name() + ", instance " + Name() + - ", executing " + std::to_string(request_count) + " requests") - .c_str()); - - uint64_t exec_start_ns = 0; - SET_TIMESTAMP(exec_start_ns); - - // We take the responsibility of the responses. - std::shared_ptr> responses( - new std::vector()); - responses->reserve(request_count); - PbMetricReporter reporter( - TritonModelInstance(), requests, request_count, responses); - reporter.SetExecStartNs(exec_start_ns); - - for (size_t i = 0; i < request_count; i++) { - TRITONBACKEND_Response* response; - auto err = TRITONBACKEND_ResponseNew(&response, requests[i]); - if (err == nullptr) { - responses->emplace_back(response); - } else { - responses->emplace_back(nullptr); - LOG_MESSAGE(TRITONSERVER_LOG_ERROR, "Fail to create response"); - TRITONSERVER_ErrorDelete(err); - } - } - - size_t total_batch_size = 0; - RESPOND_ALL_AND_RETURN_IF_ERROR( - responses, request_count, - CheckIncomingRequests(requests, request_count, total_batch_size)); - - // No request to process - if (total_batch_size == 0) { - return; - } - - // Wait for all the pending BLS requests to be completed. - ScopedDefer bls_defer([this] { WaitForBLSRequestsToFinish(); }); - AllocatedSharedMemory request_batch; - RESPOND_ALL_AND_RETURN_IF_ERROR( - responses, request_count, - SaveRequestsToSharedMemory( - requests, request_count, pb_infer_requests, request_batch, - responses)); - - std::shared_ptr ipc_message = - IPCMessage::Create(Stub()->ShmPool(), false /*inline_response*/); - ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_ExecuteRequest; - ipc_message->Args() = request_batch.handle_; - - uint64_t compute_start_ns = 0; - SET_TIMESTAMP(compute_start_ns); - reporter.SetComputeStartNs(compute_start_ns); - - // This means that the stub process has exited and Python - // backend failed to restart the stub process. - if (!Stub()->StubActive()) { - const char* error_message = "The stub process has exited unexpectedly."; - RespondErrorToAllRequests( - error_message, responses, requests, request_count); - return; - } - - bi::managed_external_buffer::handle_t response_message; - { - NVTX_RANGE(nvtx_, "StubProcessing " + Name()); - SendMessageAndReceiveResponse( - ipc_message->ShmHandle(), response_message, restart, responses, - requests, request_count); - } - - ScopedDefer execute_finalize([this, &restart] { - // Push a dummy message to the message queue so that - // the stub process is notified that it can release - // the object stored in shared memory. - NVTX_RANGE(nvtx_, "RequestExecuteFinalize " + Name()); - if (!restart) - // Push a dummy message to signal the thread to terminate. - Stub()->StubMessageQueue()->Push(DUMMY_MESSAGE); - }); - if (restart) { - return; - } - - RESPOND_ALL_AND_RETURN_IF_EXCEPTION( - responses, request_count, - ipc_message = IPCMessage::LoadFromSharedMemory( - Stub()->ShmPool(), response_message)); - - // If the stub command is no longer PYTHONSTUB_InferExecRequest, it indicates - // that inference request execution has finished and there are no more BLS - // requests to execute. Otherwise, the Python backend will continuously - // execute BLS requests pushed to the message queue. - while (ipc_message->Command() == - PYTHONSTUB_CommandType::PYTHONSTUB_InferExecRequest || - ipc_message->Command() == - PYTHONSTUB_CommandType::PYTHONSTUB_InferStreamExecRequest) { - std::packaged_task task([this, ipc_message] { - ExecuteBLSRequest( - ipc_message, - (ipc_message->Command() == - PYTHONSTUB_CommandType::PYTHONSTUB_InferStreamExecRequest)); - }); - std::future future = - boost::asio::post(*thread_pool_, std::move(task)); - futures_.emplace_back(std::move(future)); - - auto error = Stub()->ReceiveMessageFromStub(response_message); - if (error != nullptr) { - restart = true; - RespondErrorToAllRequests( - TRITONSERVER_ErrorMessage(error), responses, requests, request_count); - return; - } - - RESPOND_ALL_AND_RETURN_IF_EXCEPTION( - responses, request_count, - ipc_message = IPCMessage::LoadFromSharedMemory( - Stub()->ShmPool(), response_message)); - } - - uint64_t compute_end_ns = 0; - SET_TIMESTAMP(compute_end_ns); - reporter.SetComputeEndNs(compute_end_ns); - - // Parsing the request response - AllocatedSharedMemory response_batch; - RESPOND_ALL_AND_RETURN_IF_EXCEPTION( - responses, request_count, - response_batch = Stub()->ShmPool()->Load(ipc_message->Args())); - - ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.data_.get()); - - // If inference fails, release all the requests and send an error response. - // If inference fails at this stage, it usually indicates a bug in the model - // code - if (response_batch_shm_ptr->has_error) { - if (response_batch_shm_ptr->is_error_set) { - std::unique_ptr error_message_shm; - RESPOND_ALL_AND_RETURN_IF_EXCEPTION( - responses, request_count, - error_message_shm = PbString::LoadFromSharedMemory( - Stub()->ShmPool(), response_batch_shm_ptr->error)); - RespondErrorToAllRequests( - error_message_shm->String().c_str(), responses, requests, - request_count); - } else { - const char* error_message = - "Failed to fetch the error in response batch."; - RespondErrorToAllRequests( - error_message, responses, requests, request_count); - } - - // Reset the release flags for all the requests. - for (auto& infer_request : pb_infer_requests) { - infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - } - return; - } - - bi::managed_external_buffer::handle_t* response_shm_handle = - reinterpret_cast( - response_batch.data_.get() + sizeof(ResponseBatch)); - - // If the output provided by the model is in GPU, we will pass the list of - // buffers provided by Triton to the stub process. - bool has_gpu_output = false; - std::vector requires_deferred_callback; - - std::vector> shm_responses; - std::vector, void*>>> - gpu_output_buffers(request_count); - GPUBuffersHelper gpu_buffer_helper; - - for (uint32_t r = 0; r < request_count; ++r) { - NVTX_RANGE(nvtx_, "LoadingResponse " + Name()); - TRITONBACKEND_Response* response = (*responses)[r]; - TRITONBACKEND_Request* request = requests[r]; - uint32_t requested_output_count = 0; - requires_deferred_callback.push_back(false); - - shm_responses.emplace_back(nullptr); - std::unique_ptr& infer_response = shm_responses.back(); - try { - if (pb_infer_requests[r]->ReleaseFlags() == - TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) { - // For rescheduled requests, we do not need to send a response. - LOG_IF_ERROR( - TRITONBACKEND_ResponseDelete((*responses)[r]), - "failed to delete response"); - (*responses)[r] = nullptr; - continue; - } - infer_response = InferResponse::LoadFromSharedMemory( - Stub()->ShmPool(), response_shm_handle[r], - false /* open_cuda_handle */); - if (infer_response->HasError()) { - TRITONSERVER_Error* err = TRITONSERVER_ErrorNew( - infer_response->Error()->Code(), - infer_response->Error()->Message().c_str()); - - LOG_IF_ERROR( - TRITONBACKEND_ResponseSend( - (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), - "failed sending response"); - TRITONSERVER_ErrorDelete(err); - (*responses)[r] = nullptr; - - // Reset the release flags for the request. - pb_infer_requests[r]->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - - // If has_error is true, we do not look at the response tensors. - continue; - } - } - catch (const PythonBackendException& pb_exception) { - TRITONSERVER_Error* err = TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, pb_exception.what()); - LOG_IF_ERROR( - TRITONBACKEND_ResponseSend( - (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), - "failed sending response"); - TRITONSERVER_ErrorDelete(err); - (*responses)[r] = nullptr; - - // Reset the release flags for the request. - pb_infer_requests[r]->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - - continue; - } - - GUARDED_RESPOND_IF_ERROR( - responses, r, - TRITONBACKEND_RequestOutputCount(request, &requested_output_count)); - - std::set requested_output_names; - for (size_t j = 0; j < requested_output_count; ++j) { - const char* output_name; - GUARDED_RESPOND_IF_ERROR( - responses, r, - TRITONBACKEND_RequestOutputName(request, j, &output_name)); - requested_output_names.insert(output_name); - } - - bool require_deferred_callback = false; - -#ifdef TRITON_ENABLE_GPU - for (auto& output_tensor : infer_response->OutputTensors()) { - if (output_tensor->MemoryType() == TRITONSERVER_MEMORY_GPU) { - // Attempt to use the cuda shared memory pool for GPU tensor. - ShareCUDAMemoryPool(output_tensor->MemoryTypeId()); - } - } -#endif // TRITON_ENABLE_GPU - - gpu_output_buffers[r] = - std::vector, void*>>{}; - infer_response->Send( - response, CudaStream(), require_deferred_callback, - TRITONSERVER_RESPONSE_COMPLETE_FINAL, Stub()->ShmPool(), - gpu_buffer_helper, gpu_output_buffers[r], requested_output_names); - - requires_deferred_callback[r] = require_deferred_callback; - - if (requires_deferred_callback[r]) { - has_gpu_output = true; - } - } - - // Finalize the execute. - execute_finalize.Complete(); - - // If the output tensor is in GPU, there will be a second round trip - // required for filling the GPU buffers provided by the main process. - if (has_gpu_output) { - ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers; - gpu_buffer_helper.Complete(Stub()->ShmPool()); - ipc_message->Args() = gpu_buffer_helper.ShmHandle(); - SendMessageAndReceiveResponse( - ipc_message->ShmHandle(), response_message, restart, responses, - requests, 0); - - bool cuda_copy = false; - - uint32_t response_index = 0; - for (auto& gpu_output_buffer : gpu_output_buffers) { - for (auto& buffer_memory_pair : gpu_output_buffer) { - auto& pb_memory = buffer_memory_pair.first; - void* pointer = buffer_memory_pair.second; - bool cuda_used = false; - - if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) { - GUARDED_RESPOND_IF_ERROR( - responses, response_index, - CopyBuffer( - "Failed to copy the output tensor to buffer.", - TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0, - pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, - CudaStream(), &cuda_used)); - cuda_copy |= cuda_used; - } else if ( - (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) && - pb_memory->UseCUDASharedPool() && - (pb_memory->DataPtr() != pointer)) { - // If the data pointer from pb_memory is not the same as the pointer, - // it means that the Triton-provided buffer is not used during tensor - // transfer. Instead, an intermediate buffer that uses CUDA shared - // memory pool is used. In this case, we need to copy the data - // from the intermediate buffer back to the Triton-provided buffer. - GUARDED_RESPOND_IF_ERROR( - responses, response_index, - CopyBuffer( - "Failed to copy the output tensor to buffer.", - TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), - TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), - pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, - CudaStream(), &cuda_used)); - cuda_copy |= cuda_used; - } - } - response_index++; -#ifdef TRITON_ENABLE_GPU - if (cuda_copy) { - cudaStreamSynchronize(stream_); - } -#endif // TRITON_ENABLE_GPU - } - } - - bls_defer.Complete(); - for (uint32_t r = 0; r < request_count; ++r) { - if (requires_deferred_callback[r]) { - shm_responses[r]->DeferredSendCallback(); - } - } - - uint64_t exec_end_ns = 0; - SET_TIMESTAMP(exec_end_ns); - reporter.SetExecEndNs(exec_end_ns); - reporter.SetBatchStatistics(total_batch_size); - - return; -} - void ModelInstanceState::PrepareResponseBatch( ResponseBatch** response_batch, @@ -1873,18 +1386,13 @@ ModelInstanceState::ShareCUDAMemoryPool(const int32_t device_id) ModelInstanceState::~ModelInstanceState() { - ModelState* model_state = reinterpret_cast(Model()); Stub()->UpdateHealth(); if (Stub()->IsHealthy()) { - if (model_state->IsDecoupled()) { - // Wait for all the pending tasks to finish. - thread_pool_->wait(); - // Push a dummy message to signal the thread to terminate. - Stub()->ParentMessageQueue()->Push(DUMMY_MESSAGE); - decoupled_monitor_.join(); - } else { - thread_pool_->wait(); - } + // Wait for all the pending tasks to finish. + thread_pool_->wait(); + // Push a dummy message to signal the thread to terminate. + Stub()->ParentMessageQueue()->Push(DUMMY_MESSAGE); + decoupled_monitor_.join(); } // Terminate stub first to allow any last messages to be received by the back // end before deallocating the queue memory @@ -2445,36 +1953,10 @@ TRITONBACKEND_ModelInstanceExecute( // If restart is equal to true, it indicates that the stub process is // unhealthy and needs a restart. - bool restart = false; - ModelState* model_state = - reinterpret_cast(instance_state->Model()); - std::vector> infer_requests; - if (!model_state->IsDecoupled()) { - instance_state->ProcessRequests( - requests, request_count, infer_requests, restart); + // TODO: Implement restart on decoupled - if (restart) { - LOG_MESSAGE( - TRITONSERVER_LOG_ERROR, - "Stub process is unhealthy and it will be restarted."); - instance_state->TerminateMonitor(); - instance_state->Stub()->KillStubProcess(); - TRITONSERVER_Error* err = instance_state->Stub()->Setup(); - if (err == nullptr) { - instance_state->StartMonitor(); - } - LOG_IF_ERROR(err, "Failed to restart the stub process."); - err = instance_state->Stub()->Launch(); - LOG_IF_ERROR( - err, - "Failed to restart the stub process: failed to launch " - "the stub process."); - // Reset the release flags for all the requests. - for (auto& infer_request : infer_requests) { - infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - } - } - } else { + std::vector> infer_requests; + { uint64_t exec_start_ns = 0; SET_TIMESTAMP(exec_start_ns); @@ -2483,7 +1965,7 @@ TRITONBACKEND_ModelInstanceExecute( nullptr); reporter.SetExecStartNs(exec_start_ns); - error = instance_state->ProcessRequestsDecoupled( + error = instance_state->ProcessRequests( requests, request_count, infer_requests, reporter); uint64_t exec_end_ns = 0; diff --git a/src/python_be.h b/src/python_be.h index 9618204c..78522f5d 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -309,26 +309,8 @@ class ModelInstanceState : public BackendModelInstance { // Launch stub process. TRITONSERVER_Error* LaunchStubProcess(); - TRITONSERVER_Error* SendMessageToStub( - bi::managed_external_buffer::handle_t message); void ResponseSendDecoupled(std::shared_ptr response_send_message); - // Checks whether the stub process is live - bool IsStubProcessAlive(); - - // Get a message from the stub process - void SendMessageAndReceiveResponse( - bi::managed_external_buffer::handle_t message, - bi::managed_external_buffer::handle_t& response, bool& restart, - std::shared_ptr>& responses, - TRITONBACKEND_Request** requests, const uint32_t request_count); - - // Responds to all the requests with an error message. - void RespondErrorToAllRequests( - const char* message, - std::shared_ptr>& responses, - TRITONBACKEND_Request** requests, const uint32_t request_count); - // In the decoupled mode, the parent message queue is monitored only by this // function during the execute phase. No other thread should pop any message // from the message queue in the decoupled mode. @@ -347,14 +329,8 @@ class ModelInstanceState : public BackendModelInstance { TRITONBACKEND_Request* request, std::shared_ptr>& responses); - // Process all the requests obtained from Triton. - void ProcessRequests( - TRITONBACKEND_Request** requests, const uint32_t request_count, - std::vector>& pb_infer_requests, - bool& restart); - // Process all the requests in the decoupled mode. - TRITONSERVER_Error* ProcessRequestsDecoupled( + TRITONSERVER_Error* ProcessRequests( TRITONBACKEND_Request** requests, const uint32_t request_count, std::vector>& pb_infer_requests, PbMetricReporter& pb_metric_reporter); @@ -368,9 +344,6 @@ class ModelInstanceState : public BackendModelInstance { // Cleanup BLS responses void CleanupBLSResponses(); - // Wait for BLS requests to complete - void WaitForBLSRequestsToFinish(); - // Check the incoming requests for errors TRITONSERVER_Error* CheckIncomingRequests( TRITONBACKEND_Request** requests, const uint32_t request_count,