Skip to content

Commit

Permalink
Add response sender to non-decoupled model and unify data pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
kthui committed May 22, 2024
1 parent 9d2c513 commit bc16e1a
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 749 deletions.
7 changes: 0 additions & 7 deletions src/infer_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,6 @@ InferRequest::IsCancelled()
std::shared_ptr<ResponseSender>
InferRequest::GetResponseSender()
{
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
if (!stub->IsDecoupled()) {
throw PythonBackendException(
"'get_response_sender' function must be called only when the model is "
"using the decoupled transaction policy.");
}

return response_sender_;
}

Expand Down
240 changes: 70 additions & 170 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,11 +402,7 @@ Stub::RunCommand()
shm_pool_->Load<char>(ipc_message->Args());
RequestBatch* request_batch_shm_ptr =
reinterpret_cast<RequestBatch*>(request_batch.data_.get());
if (!ipc_control_->decoupled) {
ProcessRequests(request_batch_shm_ptr);
} else {
ProcessRequestsDecoupled(request_batch_shm_ptr);
}
ProcessRequests(request_batch_shm_ptr);

} break;
case PYTHONSTUB_CommandType::PYTHONSTUB_FinalizeRequest:
Expand Down Expand Up @@ -597,18 +593,6 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle)
initialized_ = true;
}

void
Stub::ProcessResponse(InferResponse* response)
{
response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */);

for (auto& output_tensor : response->OutputTensors()) {
if (!output_tensor->IsCPU()) {
gpu_tensors_.push_back(output_tensor);
}
}
}

void
Stub::LoadGPUBuffers(std::unique_ptr<IPCMessage>& ipc_message)
{
Expand Down Expand Up @@ -682,7 +666,7 @@ Stub::LoadRequestsFromSharedMemory(RequestBatch* request_batch_shm_ptr)
}

void
Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)
Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
{
py::list py_request_list =
LoadRequestsFromSharedMemory(request_batch_shm_ptr);
Expand Down Expand Up @@ -718,18 +702,21 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)

py::object execute_return =
model_instance_.attr("execute")(py_request_list);

bool is_coroutine = py::module::import("asyncio")
.attr("iscoroutine")(execute_return)
.cast<bool>();
if (is_coroutine) {
RunCoroutine(execute_return);
} else {
if (!py::isinstance<py::none>(execute_return)) {
throw PythonBackendException(
"Python model '" + name_ +
"' is using the decoupled mode and the execute function must "
"return None.");
if (IsDecoupled()) {
// Do not wait for async decoupled execute to return.
RunCoroutine(execute_return, true /* in_background */);
} else {
py::object coroutine_return =
RunCoroutine(execute_return, false /* in_background */);
ProcessReturnedResponses(py_request_list, coroutine_return);
}
} else {
ProcessReturnedResponses(py_request_list, execute_return);
}
}
}
Expand Down Expand Up @@ -757,151 +744,60 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)
}

void
Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
Stub::ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj)
{
std::unique_ptr<IPCMessage> execute_response =
IPCMessage::Create(shm_pool_, false /* Inline response */);
execute_response->Command() = PYTHONSTUB_ExecuteResponse;

AllocatedSharedMemory<char> response_batch = shm_pool_->Construct<char>(
request_batch_shm_ptr->batch_size *
sizeof(bi::managed_external_buffer::handle_t) +
sizeof(ResponseBatch));
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(response_batch.data_.get());

std::unique_ptr<PbString> error_string_shm;
py::list inference_responses;

bi::managed_external_buffer::handle_t* responses_shm_handle =
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
response_batch.data_.get() + sizeof(ResponseBatch));

py::list responses;

// Notifying the stub should be after responses.
ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); });
ScopedDefer _(
[this, &execute_response] { SendIPCMessage(execute_response); });

execute_response->Args() = response_batch.handle_;

bool has_exception = false;
std::string error_string;
try {
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;

uint32_t batch_size = request_batch_shm_ptr->batch_size;

if (batch_size == 0) {
return;
}

py::list py_request_list =
LoadRequestsFromSharedMemory(request_batch_shm_ptr);

if (!py::hasattr(model_instance_, "execute")) {
std::string message = "Python model " + model_context_.PythonModelPath() +
" does not implement `execute` method.";
throw PythonBackendException(message);
}

py::object request_list = py_request_list;
py::module asyncio = py::module::import("asyncio");

// Execute Response
py::object execute_return;
py::object responses_obj;
bool is_coroutine;

{
NVTX_RANGE(nvtx_, "PyExecute " + name_);
execute_return = model_instance_.attr("execute")(request_list);
is_coroutine = asyncio.attr("iscoroutine")(execute_return).cast<bool>();
}

if (is_coroutine) {
responses_obj = asyncio.attr("run")(execute_return);
} else {
responses_obj = execute_return;
}

// Check the return type of execute function.
if (!py::isinstance<py::list>(responses_obj)) {
std::string str = py::str(execute_return.get_type());
throw PythonBackendException(
std::string("Expected a list in the execute return, found type '") +
str + "'.");
}

responses = responses_obj;
size_t response_size = py::len(responses);

// If the number of request objects do not match the number of
// response objects throw an error.
if (response_size != batch_size) {
std::string err =
"Number of InferenceResponse objects do not match the number "
"of "
"InferenceRequest objects. InferenceRequest(s) size is:" +
std::to_string(batch_size) + ", and InferenceResponse(s) size is:" +
std::to_string(response_size) + "\n";
throw PythonBackendException(err);
}

for (size_t i = 0; i < response_size; i++) {
// Check the return type of execute function.
InferRequest* infer_request = py_request_list[i].cast<InferRequest*>();
if (infer_request->ReleaseFlags() ==
TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) {
if (!py::isinstance<py::none>(responses[i])) {
// When the request is rescheduled in non-decoupled model, the
// response must be None.
std::string str = py::str(responses[i].get_type());
throw PythonBackendException(
"Expected a None object in the execute function return list for "
"reschduled request, "
"found type '" +
str + "'.");
}
} else {
if (!py::isinstance<InferResponse>(responses[i])) {
std::string str = py::str(responses[i].get_type());
throw PythonBackendException(
std::string(
"Expected an 'InferenceResponse' object in the execute "
"function return list, found type '") +
str + "'.");
}
InferResponse* infer_response = responses[i].cast<InferResponse*>();
infer_response->PruneOutputTensors(
infer_request->RequestedOutputNames());
ProcessResponse(infer_response);
responses_shm_handle[i] = infer_response->ShmHandle();
}
}
response_batch_shm_ptr->batch_size = response_size;
// Return if there is nothing to process.
if (py::isinstance<py::none>(py_responses_obj)) {
return;
}
catch (const PythonBackendException& pb_exception) {
has_exception = true;
error_string = pb_exception.what();
// Only non-decoupled may return responses.
if (IsDecoupled()) {
throw PythonBackendException(
"Python model '" + name_ +
"' is using the decoupled mode and the execute function must return "
"None.");
}
catch (const py::error_already_set& error) {
has_exception = true;
error_string = error.what();
// Check responses is a list.
if (!py::isinstance<py::list>(py_responses_obj)) {
throw PythonBackendException(
"Expected a list in the execute return, found type '" +
std::string(py::str(py_responses_obj.get_type())) + "'.");
}
py::list py_responses = py_responses_obj;
// Responses and requests length must match.
size_t requests_size = py::len(py_requests);
size_t responses_size = py::len(py_responses);
if (requests_size != responses_size) {
throw PythonBackendException(
"Number of InferenceResponse objects do not match the number of "
"InferenceRequest objects. InferenceRequest(s) size is:" +
std::to_string(requests_size) + ", and InferenceResponse(s) size is:" +
std::to_string(responses_size) + "\n");
}

if (has_exception) {
std::string err_message =
std::string(
"Failed to process the request(s) for model '" + name_ +
"', message: ") +
error_string;
error_string_shm = PbString::Create(shm_pool_, error_string);
response_batch_shm_ptr->has_error = true;
response_batch_shm_ptr->is_error_set = true;
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
for (size_t i = 0; i < responses_size; i++) {
if (!py::isinstance<py::none>(py_responses[i])) {
InferRequest* request = py_requests[i].cast<InferRequest*>();
// Response must be None if rescheduled.
if (request->ReleaseFlags() == TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) {
throw PythonBackendException(
"Expected a None object in the execute function return list for "
"reschduled request, found type '" +
std::string(py::str(py_responses[i].get_type())) + "'.");
}
// Send the response.
if (!py::isinstance<InferResponse>(py_responses[i])) {
throw PythonBackendException(
"Expected an 'InferenceResponse' object in the execute function "
"return list, found type '" +
std::string(py::str(py_responses[i].get_type())) + "'.");
}
std::shared_ptr<InferResponse> response =
py_responses[i].cast<std::shared_ptr<InferResponse>>();
request->GetResponseSender()->Send(
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
}
}
}

Expand All @@ -923,15 +819,19 @@ Stub::GetAsyncEventLoop()
return async_event_loop_;
}

void
Stub::RunCoroutine(py::object coroutine)
py::object
Stub::RunCoroutine(py::object coroutine, bool in_background)
{
py::object loop = GetAsyncEventLoop();
py::object py_future = py::module_::import("asyncio").attr(
"run_coroutine_threadsafe")(coroutine, loop);
py_future.attr("add_done_callback")(
py::module_::import("c_python_backend_utils")
.attr("async_event_future_done_callback"));
if (in_background) {
py_future.attr("add_done_callback")(
py::module_::import("c_python_backend_utils")
.attr("async_event_future_done_callback"));
return py::none();
}
return py_future.attr("result")();
}

void
Expand Down
7 changes: 3 additions & 4 deletions src/pb_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,19 @@ class Stub {
/// Execute a batch of requests.
void ProcessRequests(RequestBatch* request_batch_shm_ptr);

void ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr);
void ProcessReturnedResponses(
py::list py_requests, py::object py_responses_obj);

py::object GetAsyncEventLoop();

void RunCoroutine(py::object coroutine);
py::object RunCoroutine(py::object coroutine, bool in_background);

/// Get the memory manager message queue
std::unique_ptr<MessageQueue<uint64_t>>& MemoryManagerQueue();

/// Get the shared memory pool
std::unique_ptr<SharedMemoryManager>& ShmPool() { return shm_pool_; }

void ProcessResponse(InferResponse* response);

void ProcessBLSResponseDecoupled(std::unique_ptr<IPCMessage>& ipc_message);

void LoadGPUBuffers(std::unique_ptr<IPCMessage>& ipc_message);
Expand Down
Loading

0 comments on commit bc16e1a

Please sign in to comment.