Skip to content

Commit

Permalink
Decoupled Async Execute (#350)
Browse files Browse the repository at this point in the history
* Add async decoupled execute

* Enable decoupled bls async exec

* Improve handling for async execute future object

* Add docs for async execute for decoupled model

* Fix link on docs

* Improve docs wording

* Improve destruction steps for async execute future object

* Piggy back on GIL for protection

* Document model should not modify event loop

* Use Python add_done_callback

* Protect infer_payload_

* Use traceback API that supports Python 3.8 and 3.9

* Update docs
  • Loading branch information
kthui authored and mc-nv committed Apr 11, 2024
1 parent 4d42111 commit 121306e
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 14 deletions.
21 changes: 18 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ any C++ code.
- [Request Cancellation Handling](#request-cancellation-handling)
- [Decoupled mode](#decoupled-mode)
- [Use Cases](#use-cases)
- [Known Issues](#known-issues)
- [Async Execute](#async-execute)
- [Request Rescheduling](#request-rescheduling)
- [`finalize`](#finalize)
- [Model Config File](#model-config-file)
Expand Down Expand Up @@ -620,9 +620,24 @@ full power of what can be achieved from decoupled API. Read
[Decoupled Backends and Models](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/decoupled_models.md)
for more details on how to host a decoupled model.

##### Known Issues
##### Async Execute

* Currently, decoupled Python models can not make async infer requests.
Starting from 24.04, `async def execute(self, requests):` is supported for
decoupled Python models. Its coroutine will be executed by an AsyncIO event loop
shared with requests executing in the same model instance. The next request for
the model instance can start executing while the current request is waiting.

This is useful for minimizing the number of model instances for models that
spend the majority of its time waiting, given requests can be executed
concurrently by AsyncIO. To take full advantage of the concurrency, it is vital
for the async execute function to not block the event loop from making progress
while it is waiting, i.e. downloading over the network.

Notes:
* The model should not modify the running event loop, as this might cause
unexpected issues.
* The server/backend do not control how many requests are added to the event
loop by a model instance.

#### Request Rescheduling

Expand Down
90 changes: 80 additions & 10 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,32 @@ PyDefaultArgumentToMutableType(const py::object& argument)
std::string(py::str(argument.get_type())));
}

void
AsyncEventFutureDoneCallback(const py::object& py_future)
{
// TODO: Why using `py_future.result()` with error hangs on exit?
try {
py::object exception = py_future.attr("exception")();
if (!py::isinstance<py::none>(exception)) {
std::string err_msg = "";
py::object traceback = py::module_::import("traceback")
.attr("TracebackException")
.attr("from_exception")(exception)
.attr("format")();
for (py::handle line : traceback) {
err_msg += py::str(line);
}
LOG_ERROR << err_msg;
}
}
catch (const PythonBackendException& pb_exception) {
LOG_ERROR << pb_exception.what();
}
catch (const py::error_already_set& error) {
LOG_ERROR << error.what();
}
}

void
Stub::Instantiate(
int64_t shm_growth_size, int64_t shm_default_size,
Expand Down Expand Up @@ -533,6 +559,8 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle)
c_python_backend_utils.attr("InferenceResponse"));
c_python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get());

async_event_loop_ = py::none();

py::object TritonPythonModel = sys.attr("TritonPythonModel");
deserialize_bytes_ = python_backend_utils.attr("deserialize_bytes_tensor");
serialize_bytes_ = python_backend_utils.attr("serialize_byte_tensor");
Expand Down Expand Up @@ -690,11 +718,18 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)

py::object execute_return =
model_instance_.attr("execute")(py_request_list);
if (!py::isinstance<py::none>(execute_return)) {
throw PythonBackendException(
"Python model '" + name_ +
"' is using the decoupled mode and the execute function must "
"return None.");
bool is_coroutine = py::module::import("asyncio")
.attr("iscoroutine")(execute_return)
.cast<bool>();
if (is_coroutine) {
RunCoroutine(execute_return);
} else {
if (!py::isinstance<py::none>(execute_return)) {
throw PythonBackendException(
"Python model '" + name_ +
"' is using the decoupled mode and the execute function must "
"return None.");
}
}
}
}
Expand Down Expand Up @@ -870,6 +905,35 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
}
}

py::object
Stub::GetAsyncEventLoop()
{
if (py::isinstance<py::none>(async_event_loop_)) {
// Create the event loop if not already.
py::module asyncio = py::module_::import("asyncio");
async_event_loop_ = asyncio.attr("new_event_loop")();
asyncio.attr("set_event_loop")(async_event_loop_);
py::object py_thread =
py::module_::import("threading")
.attr("Thread")(
"target"_a = async_event_loop_.attr("run_forever"),
"daemon"_a = true);
py_thread.attr("start")();
}
return async_event_loop_;
}

void
Stub::RunCoroutine(py::object coroutine)
{
py::object loop = GetAsyncEventLoop();
py::object py_future = py::module_::import("asyncio").attr(
"run_coroutine_threadsafe")(coroutine, loop);
py_future.attr("add_done_callback")(
py::module_::import("c_python_backend_utils")
.attr("async_event_future_done_callback"));
}

void
Stub::UpdateHealth()
{
Expand All @@ -881,6 +945,10 @@ void
Stub::Finalize()
{
finalizing_ = true;
// Stop async event loop if created.
if (!py::isinstance<py::none>(async_event_loop_)) {
async_event_loop_.attr("stop")();
}
// Call finalize if exists.
if (initialized_ && py::hasattr(model_instance_, "finalize")) {
try {
Expand Down Expand Up @@ -943,6 +1011,7 @@ Stub::~Stub()

{
py::gil_scoped_acquire acquire;
async_event_loop_ = py::none();
model_instance_ = py::none();
}
stub_instance_.reset();
Expand Down Expand Up @@ -1729,11 +1798,6 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)
[](std::shared_ptr<InferRequest>& infer_request,
const bool decoupled) {
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
if (stub->IsDecoupled()) {
throw PythonBackendException(
"Async BLS request execution is not support in the decoupled "
"API.");
}
py::object loop =
py::module_::import("asyncio").attr("get_running_loop")();
py::cpp_function callback = [&stub, infer_request, decoupled]() {
Expand Down Expand Up @@ -1860,6 +1924,12 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)
"is_model_ready", &IsModelReady, py::arg("model_name").none(false),
py::arg("model_version").none(false) = "");

// This function is not part of the public API for Python backend. This is
// only used for internal callbacks.
module.def(
"async_event_future_done_callback", &AsyncEventFutureDoneCallback,
py::arg("py_future").none(false));

// This class is not part of the public API for Python backend. This is only
// used for internal testing purposes.
py::class_<SharedMemoryManager>(module, "SharedMemory")
Expand Down
7 changes: 6 additions & 1 deletion src/pb_stub.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -255,6 +255,10 @@ class Stub {

void ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr);

py::object GetAsyncEventLoop();

void RunCoroutine(py::object coroutine);

/// Get the memory manager message queue
std::unique_ptr<MessageQueue<uint64_t>>& MemoryManagerQueue();

Expand Down Expand Up @@ -363,6 +367,7 @@ class Stub {
py::object model_instance_;
py::object deserialize_bytes_;
py::object serialize_bytes_;
py::object async_event_loop_;
std::unique_ptr<MessageQueue<bi::managed_external_buffer::handle_t>>
stub_message_queue_;
std::unique_ptr<MessageQueue<bi::managed_external_buffer::handle_t>>
Expand Down
2 changes: 2 additions & 0 deletions src/python_be.cc
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ ModelInstanceState::ExecuteBLSRequest(
if (is_decoupled && (infer_response->Id() != nullptr)) {
// Need to manage the lifetime of InferPayload object for bls
// decoupled responses.
std::lock_guard<std::mutex> lock(infer_payload_mu_);
infer_payload_[reinterpret_cast<intptr_t>(infer_payload.get())] =
infer_payload;
}
Expand Down Expand Up @@ -961,6 +962,7 @@ ModelInstanceState::ProcessCleanupRequest(
intptr_t id = reinterpret_cast<intptr_t>(cleanup_message_ptr->id);
if (message->Command() == PYTHONSTUB_BLSDecoupledInferPayloadCleanup) {
// Remove the InferPayload object from the map.
std::lock_guard<std::mutex> lock(infer_payload_mu_);
infer_payload_.erase(id);
} else if (message->Command() == PYTHONSTUB_DecoupledResponseFactoryCleanup) {
// Delete response factory
Expand Down
1 change: 1 addition & 0 deletions src/python_be.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ class ModelInstanceState : public BackendModelInstance {
std::vector<std::future<void>> futures_;
std::unique_ptr<boost::asio::thread_pool> thread_pool_;
std::unordered_map<intptr_t, std::shared_ptr<InferPayload>> infer_payload_;
std::mutex infer_payload_mu_;
std::unique_ptr<RequestExecutor> request_executor_;

public:
Expand Down

0 comments on commit 121306e

Please sign in to comment.