diff --git a/src/infer_request.cc b/src/infer_request.cc index fc1e4206..57ea6cf1 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -74,7 +74,7 @@ InferRequest::InferRequest( pb_cancel_ = std::make_shared(response_factory_address_, request_address_); response_sender_ = std::make_shared( - request_address_, response_factory_address_, + request_address_, response_factory_address_, nullptr /* is_decoupled */, Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_); #endif } @@ -272,7 +272,8 @@ InferRequest::SaveToSharedMemory(std::unique_ptr& shm_pool) std::unique_ptr InferRequest::LoadFromSharedMemory( std::unique_ptr& shm_pool, - bi::managed_external_buffer::handle_t request_handle, bool open_cuda_handle) + bi::managed_external_buffer::handle_t request_handle, bool open_cuda_handle, + bool const* is_model_decoupled) { AllocatedSharedMemory infer_request_shm = shm_pool->Load(request_handle); @@ -328,7 +329,7 @@ InferRequest::LoadFromSharedMemory( return std::unique_ptr(new InferRequest( infer_request_shm, request_id_shm, correlation_id_shm, requested_output_names_shm, model_name_shm, input_tensors, parameters_shm, - infer_trace_shm)); + infer_trace_shm, is_model_decoupled)); } InferRequest::InferRequest( @@ -339,7 +340,8 @@ InferRequest::InferRequest( std::unique_ptr& model_name_shm, std::vector>& input_tensors, std::unique_ptr& parameters_shm, - std::unique_ptr& infer_trace_shm) + std::unique_ptr& infer_trace_shm, + bool const* is_model_decoupled) : infer_request_shm_(std::move(infer_request_shm)), request_id_shm_(std::move(request_id_shm)), requested_output_names_shm_(std::move(requested_output_names_shm)), @@ -387,7 +389,7 @@ InferRequest::InferRequest( pb_cancel_ = std::make_shared(response_factory_address_, request_address_); response_sender_ = std::make_shared( - request_address_, response_factory_address_, + request_address_, response_factory_address_, is_model_decoupled, Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_); #endif } diff --git a/src/infer_request.h b/src/infer_request.h index e0887624..c67e2fb0 100644 --- a/src/infer_request.h +++ b/src/infer_request.h @@ -118,7 +118,7 @@ class InferRequest { static std::unique_ptr LoadFromSharedMemory( std::unique_ptr& shm_pool, bi::managed_external_buffer::handle_t request_handle, - bool open_cuda_handle); + bool open_cuda_handle, bool const* is_model_decoupled); /// Disallow copying the inference request object. DISALLOW_COPY_AND_ASSIGN(InferRequest); @@ -135,7 +135,8 @@ class InferRequest { std::unique_ptr& model_name_shm, std::vector>& input_tensors, std::unique_ptr& parameters_shm, - std::unique_ptr& infer_trace_shm); + std::unique_ptr& infer_trace_shm, + bool const* is_model_decoupled); std::string request_id_; CorrelationId correlation_id_; diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 2f5c556c..2ad9b738 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -658,7 +658,8 @@ Stub::LoadRequestsFromSharedMemory(RequestBatch* request_batch_shm_ptr) for (size_t i = 0; i < batch_size; i++) { std::shared_ptr infer_request = InferRequest::LoadFromSharedMemory( - shm_pool_, request_shm_handle[i], true /* open_cuda_handle */); + shm_pool_, request_shm_handle[i], true /* open_cuda_handle */, + &ipc_control_->decoupled /* is_model_decoupled */); py_request_list.append(infer_request); } diff --git a/src/python_be.cc b/src/python_be.cc index 2dcb363c..84eb0db9 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -571,7 +571,8 @@ ModelInstanceState::ExecuteBLSRequest( reinterpret_cast( request_batch.data_.get() + sizeof(RequestBatch)); infer_request = InferRequest::LoadFromSharedMemory( - Stub()->ShmPool(), *request_handle, false /* open_cuda_handle */); + Stub()->ShmPool(), *request_handle, false /* open_cuda_handle */, + nullptr /* is_model_decoupled */); // If the BLS inputs are in GPU an additional round trip between the // stub process and the main process is required. The reason is that we diff --git a/src/response_sender.cc b/src/response_sender.cc index 038279db..3257cd9a 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -35,13 +35,31 @@ namespace triton { namespace backend { namespace python { +void +AssertResponseSenderArgumentsWellFormed( + const std::shared_ptr& response, const uint32_t flags) +{ + // Check the correctness of the provided flags. + if (flags != TRITONSERVER_RESPONSE_COMPLETE_FINAL && flags != 0) { + throw PythonBackendException( + "Unable to send response. Unsupported flag provided."); + } + + if (flags == 0 && response == nullptr) { + throw PythonBackendException( + "Inference Response object must be provided when the response flags is " + "set to zero."); + } +} + ResponseSender::ResponseSender( intptr_t request_address, intptr_t response_factory_address, - std::unique_ptr& shm_pool, + bool const* is_decoupled, std::unique_ptr& shm_pool, const std::shared_ptr& pb_cancel) : request_address_(request_address), - response_factory_address_(response_factory_address), shm_pool_(shm_pool), - closed_(false), pb_cancel_(pb_cancel) + response_factory_address_(response_factory_address), + is_decoupled_(is_decoupled), shm_pool_(shm_pool), pb_cancel_(pb_cancel), + closed_(false), number_of_response_sent_(0) { } @@ -54,15 +72,32 @@ ResponseSender::~ResponseSender() } void -ResponseSender::Send( - std::shared_ptr infer_response, const uint32_t flags) +ResponseSender::UpdateStateAndCounters( + const std::shared_ptr& response, const uint32_t flags) { - // Release the GIL. This avoids a potential deadlock situation in the parent - // process, where every thread in the thread pool is indirectly waiting for a - // function in the stub process that acquires the GIL. Meanwhile, the current - // thread, which holds the GIL, is also waiting for the parent side to have - // the next available thread to pick up the job during resource contention. - py::gil_scoped_release release; + if (is_decoupled_ == nullptr) { + // TODO: Can a model access the response sender on a BLS infer request? + throw PythonBackendException( + "Unable to send response. Response sender has no reference to the " + "decoupled state of the model."); + } + bool is_decoupled = *is_decoupled_; + + std::lock_guard lk(mu_); + + if (!is_decoupled) { + if (response != nullptr && number_of_response_sent_ > 0) { + throw PythonBackendException( + "Unable to send response. Non-decoupled model cannot send more than " + "one response."); + } + if (response == nullptr && flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL && + number_of_response_sent_ == 0) { + throw PythonBackendException( + "Unable to send response. Non-decoupled model cannot send complete " + "final before sending a response."); + } + } if (closed_) { throw PythonBackendException( @@ -72,18 +107,22 @@ ResponseSender::Send( if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { closed_ = true; } + number_of_response_sent_++; +} - // Check the correctness of the provided flags. - if (flags != TRITONSERVER_RESPONSE_COMPLETE_FINAL && flags != 0) { - throw PythonBackendException( - "Unable to send response. Unsupported flag provided."); - } +void +ResponseSender::Send( + std::shared_ptr infer_response, const uint32_t flags) +{ + // Release the GIL. This avoids a potential deadlock situation in the parent + // process, where every thread in the thread pool is indirectly waiting for a + // function in the stub process that acquires the GIL. Meanwhile, the current + // thread, which holds the GIL, is also waiting for the parent side to have + // the next available thread to pick up the job during resource contention. + py::gil_scoped_release release; - if (flags == 0 && infer_response == nullptr) { - throw PythonBackendException( - "Inference Response object must be provided when the response flags is " - "set to zero."); - } + AssertResponseSenderArgumentsWellFormed(infer_response, flags); + UpdateStateAndCounters(infer_response, flags); std::unique_ptr& stub = Stub::GetOrCreateInstance(); diff --git a/src/response_sender.h b/src/response_sender.h index d29a6ab6..6303bb42 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -26,6 +26,8 @@ #pragma once +#include + #include "infer_response.h" #include "pb_cancel.h" #include "shm_manager.h" @@ -36,17 +38,24 @@ class ResponseSender { public: ResponseSender( intptr_t request_address, intptr_t response_factory_address, - std::unique_ptr& shm_pool, + bool const* is_decoupled, std::unique_ptr& shm_pool, const std::shared_ptr& pb_cancel); ~ResponseSender(); void Send(std::shared_ptr response, const uint32_t flags); bool IsCancelled(); private: + void UpdateStateAndCounters( + const std::shared_ptr& response, const uint32_t flags); + intptr_t request_address_; intptr_t response_factory_address_; + bool const* is_decoupled_; std::unique_ptr& shm_pool_; - bool closed_; std::shared_ptr pb_cancel_; + + std::mutex mu_; + bool closed_; + size_t number_of_response_sent_; }; }}} // namespace triton::backend::python