From 985c5a24173a307f575824df0cc39d5bc7c5a277 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Tue, 2 Apr 2024 22:56:31 -0700 Subject: [PATCH] Improve handling for async execute future object --- src/pb_stub.cc | 53 ++++++++++++++++++++++---------------------------- src/pb_stub.h | 4 ++-- 2 files changed, 25 insertions(+), 32 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 42122b79..f35db5e6 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -904,33 +904,30 @@ Stub::RunCoroutine(py::object coroutine) py::object py_future = py::module_::import("asyncio").attr( "run_coroutine_threadsafe")(coroutine, loop); - { - std::lock_guard lock(async_event_futures_mu_); - - std::shared_ptr> shared_future(new std::future()); - std::future c_future = std::async( - std::launch::async, [this, shared_future, py_future]() mutable { - { - py::gil_scoped_acquire gil_acquire; - try { - py_future.attr("result")(); - } - catch (const PythonBackendException& pb_exception) { - LOG_ERROR << pb_exception.what(); - } - catch (const py::error_already_set& error) { - LOG_ERROR << error.what(); - } - py_future = py::none(); + std::shared_ptr> shared_future(new std::future()); + std::future c_future = std::async( + std::launch::async, [this, shared_future, py_future]() mutable { + { + py::gil_scoped_acquire gil_acquire; + try { + py_future.attr("result")(); } - { - std::lock_guard lock(async_event_futures_mu_); - async_event_futures_.erase(shared_future); + catch (const PythonBackendException& pb_exception) { + LOG_ERROR << pb_exception.what(); } - }); - *shared_future = std::move(c_future); - async_event_futures_.emplace(std::move(shared_future)); - } + catch (const py::error_already_set& error) { + LOG_ERROR << error.what(); + } + py_future = py::none(); + } + std::vector>> empty; + { + std::lock_guard lock(async_event_futures_mu_); + done_async_event_futures_.swap(empty); + done_async_event_futures_.emplace_back(std::move(shared_future)); + } + }); + *shared_future = std::move(c_future); return py::none(); } @@ -948,10 +945,6 @@ Stub::Finalize() finalizing_ = true; // Stop async event loop if created. if (!py::isinstance(async_event_loop_)) { - if (!async_event_futures_.empty()) { - LOG_ERROR << "Finalizing stub with " << async_event_futures_.size() - << " ongoing coroutines"; - } async_event_loop_.attr("stop")(); } // Call finalize if exists. @@ -1016,7 +1009,7 @@ Stub::~Stub() { py::gil_scoped_acquire acquire; - async_event_futures_.clear(); + done_async_event_futures_.clear(); async_event_loop_ = py::none(); model_instance_ = py::none(); } diff --git a/src/pb_stub.h b/src/pb_stub.h index 1b11c439..0d933c7e 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -33,7 +33,7 @@ #include #include #include -#include +#include #include "infer_request.h" #include "infer_response.h" @@ -371,7 +371,7 @@ class Stub { py::object deserialize_bytes_; py::object serialize_bytes_; py::object async_event_loop_; - std::unordered_set>> async_event_futures_; + std::vector>> done_async_event_futures_; std::mutex async_event_futures_mu_; std::unique_ptr> stub_message_queue_;