Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for improper response sending from model #7292

Merged
merged 9 commits into from
Jun 4, 2024
3 changes: 2 additions & 1 deletion qa/L0_backend_python/bls/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ if [[ ${TEST_WINDOWS} == 0 ]]; then
echo "instance_group [ { kind: KIND_CPU} ]" >> models/libtorch_cpu/config.pbtxt

# Test with different sizes of CUDA memory pool
for CUDA_MEMORY_POOL_SIZE_MB in 64 128 ; do
# TODO: Why 256 worked in place of 128, on decoupled data pipeline?
for CUDA_MEMORY_POOL_SIZE_MB in 64 256 ; do
CUDA_MEMORY_POOL_SIZE_BYTES=$((CUDA_MEMORY_POOL_SIZE_MB * 1024 * 1024))
SERVER_ARGS="--model-repository=${MODELDIR}/bls/models --backend-directory=${BACKEND_DIR} --log-verbose=1 --cuda-memory-pool-byte-size=0:${CUDA_MEMORY_POOL_SIZE_BYTES}"
for TRIAL in non_decoupled decoupled ; do
Expand Down
175 changes: 171 additions & 4 deletions qa/L0_backend_python/response_sender/response_sender_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ class ResponseSenderTest(unittest.TestCase):
"number_of_response_after_return": 0,
"send_complete_final_flag_after_return": False,
}
_inputs_parameters_one_response_pre_and_on_return = {
"number_of_response_before_return": 1,
"send_complete_final_flag_before_return": True,
"return_a_response": True,
"number_of_response_after_return": 0,
"send_complete_final_flag_after_return": False,
}
_inputs_parameters_one_response_on_and_post_return = {
"number_of_response_before_return": 0,
"send_complete_final_flag_before_return": False,
"return_a_response": True,
"number_of_response_after_return": 1,
"send_complete_final_flag_after_return": True,
}

def _get_inputs(
self,
Expand Down Expand Up @@ -195,6 +209,8 @@ def _assert_responses_exception(self, responses, expected_message):
self.assertIsNone(response["result"])
self.assertIsInstance(response["error"], InferenceServerException)
self.assertIn(expected_message, response["error"].message())
# There may be more responses, but currently only sees one for all tests.
self.assertEqual(len(responses), 1)

def _assert_decoupled_infer_success(
self,
Expand Down Expand Up @@ -236,13 +252,16 @@ def _assert_decoupled_infer_success(
number_of_response_after_return,
)

def _assert_non_decoupled_infer_success(
def _assert_non_decoupled_infer_with_expected_response_success(
self,
number_of_response_before_return,
send_complete_final_flag_before_return,
return_a_response,
number_of_response_after_return,
send_complete_final_flag_after_return,
expected_number_of_response_before_return,
expected_return_a_response,
expected_number_of_response_after_return,
):
model_name = "response_sender"
responses = self._infer(
Expand All @@ -255,9 +274,9 @@ def _assert_non_decoupled_infer_success(
)
self._assert_responses_valid(
responses,
number_of_response_before_return,
return_a_response,
number_of_response_after_return,
expected_number_of_response_before_return,
expected_return_a_response,
expected_number_of_response_after_return,
)
# Do NOT group into a for-loop as it hides which model failed.
model_name = "response_sender_async"
Expand All @@ -271,9 +290,28 @@ def _assert_non_decoupled_infer_success(
)
self._assert_responses_valid(
responses,
expected_number_of_response_before_return,
expected_return_a_response,
expected_number_of_response_after_return,
)

def _assert_non_decoupled_infer_success(
self,
number_of_response_before_return,
send_complete_final_flag_before_return,
return_a_response,
number_of_response_after_return,
send_complete_final_flag_after_return,
):
self._assert_non_decoupled_infer_with_expected_response_success(
number_of_response_before_return,
send_complete_final_flag_before_return,
return_a_response,
number_of_response_after_return,
send_complete_final_flag_after_return,
expected_number_of_response_before_return=number_of_response_before_return,
expected_return_a_response=return_a_response,
expected_number_of_response_after_return=number_of_response_after_return,
)

# Decoupled model send response final flag before request return.
Expand Down Expand Up @@ -411,6 +449,135 @@ def test_decoupled_one_response_on_return(self):
# TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback'
# using `py_future.result()` with error hangs on exit.

# Decoupled model send 1 response and return 1 response.
def test_decoupled_one_response_pre_and_on_return(self):
# Note: The before return response will send a valid response and close the
# response sender. Then, returning a response will generate an error, but
# since the response sender is closed, nothing is passed to the client.
responses = self._infer(
model_name="response_sender_decoupled",
**self._inputs_parameters_one_response_pre_and_on_return,
)
self._assert_responses_valid(
responses,
number_of_response_before_return=1,
return_a_response=0,
number_of_response_after_return=0,
)
# TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback'
# using `py_future.result()` with error hangs on exit.

# Decoupled model return 1 response and send 1 response.
def test_decoupled_one_response_on_and_post_return(self):
# Note: The returned response will send an error response and complete final
# flag, and close the response sender and factory. Then, sending a
# response will raise an exception. Since the exception happens after the
# model returns, it cannot be caught by the stub (i.e. in a daemon
# thread), so nothing will happen.
responses = self._infer(
model_name="response_sender_decoupled",
**self._inputs_parameters_one_response_on_and_post_return,
)
self._assert_responses_exception(
responses,
expected_message="using the decoupled mode and the execute function must return None",
)
# TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback'
# using `py_future.result()` with error hangs on exit.

# Non-decoupled model send response final flag before request return.
def test_non_decoupled_zero_response_pre_return(self):
# Note: The final flag will raise an exception which stops the model. Since the
# exception happens before the model returns, it will be caught by the
# stub process which pass it to the backend and sent an error response
# with final flag.
expected_message = (
"Non-decoupled model cannot send complete final before sending a response"
)
model_name = "response_sender"
responses = self._infer(
model_name,
**self._inputs_parameters_zero_response_pre_return,
)
self._assert_responses_exception(responses, expected_message)
# Do NOT group into a for-loop as it hides which model failed.
model_name = "response_sender_async"
responses = self._infer(
model_name,
**self._inputs_parameters_zero_response_pre_return,
)
self._assert_responses_exception(responses, expected_message)

# Non-decoupled model send response final flag after request return.
@unittest.skip("Model unload will hang, see the TODO comment.")
def test_non_decoupled_zero_response_post_return(self):
Tabrizian marked this conversation as resolved.
Show resolved Hide resolved
# Note: The final flag will raise an exception which stops the model. Since the
# exception happens after the model returns, it cannot be caught by the
# stub (i.e. in a daemon thread), so nothing will happen.
# TODO: Since the stub does not know if the model failed after returning, the
# complete final flag is not sent and will hang when unloading the model.
# How to detect such event and close the response factory?
raise NotImplementedError("No testing is performed")

# Non-decoupled model send 2 response before return.
def test_non_decoupled_two_response_pre_return(self):
# Note: The 1st response will make its way to the client, but sending the 2nd
# response will raise an exception which stops the model. Since the
# exception happens before the model returns, it will be caught by the
# stub process which pass it to the backend and sent an error response
# with final flag. Since this is non-decoupled model using gRPC stream,
# any response after the 1st will be discarded by the frontend.
self._assert_non_decoupled_infer_with_expected_response_success(
**self._inputs_parameters_two_response_pre_return,
expected_number_of_response_before_return=1,
expected_return_a_response=False,
expected_number_of_response_after_return=0,
)

# Non-decoupled model send 2 response after return.
@unittest.skip("Model unload will hang, see the TODO comment.")
def test_non_decoupled_two_response_post_return(self):
# Note: The 1st response will make its way to the client, but sending the 2nd
# response will raise an exception which stops the model. Since the
# exception happens after the model returns, it cannot be caught by the
# stub (i.e. in a daemon thread), so nothing will happen.
# TODO: Since the stub does not know if the model failed after returning, the
# complete final flag is not sent and will hang when unloading the model.
# How to detect such event and close the response factory?
self._assert_non_decoupled_infer_with_expected_response_success(
**self._inputs_parameters_two_response_post_return,
expected_number_of_response_before_return=0,
expected_return_a_response=False,
expected_number_of_response_after_return=1,
)

# Non-decoupled model send 1 response and return 1 response.
def test_non_decoupled_one_response_pre_and_on_return(self):
# Note: The sent response will make its way to the client and complete final.
# The returned response will see the response sender is closed and raise
# an exception. The backend should see the request is closed and do
# nothing upon receiving the error from stub.
self._assert_non_decoupled_infer_with_expected_response_success(
**self._inputs_parameters_one_response_pre_and_on_return,
expected_number_of_response_before_return=1,
expected_return_a_response=False,
expected_number_of_response_after_return=0,
)

# Non-decoupled model return 1 response and send 1 response.
def test_non_decoupled_one_response_on_and_post_return(self):
# Note: The returned response will send the response to the client and complete
# final. The sent response will see the response sender is closed and
# raise an exception. Since the exception happens after the model returns,
# it cannot be caught by the stub (i.e. in a daemon thread), so nothing
# will happen.
self._assert_non_decoupled_infer_with_expected_response_success(
**self._inputs_parameters_one_response_on_and_post_return,
expected_number_of_response_before_return=0,
expected_return_a_response=True,
expected_number_of_response_after_return=0,
)


if __name__ == "__main__":
unittest.main()
3 changes: 2 additions & 1 deletion qa/L0_backend_python/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ SUBTESTS="lifecycle argument_validation logging custom_metrics"
# [DLIS-6122] Disable model_control & request_rescheduling tests for Windows since they require load/unload
# [DLIS-6123] Disable examples test for Windows since it requires updates to the example clients
if [[ ${TEST_WINDOWS} == 0 ]]; then
SUBTESTS+=" restart model_control examples request_rescheduling"
# TODO: Reimplement restart on decoupled data pipeline and enable restart.
SUBTESTS+=" model_control examples request_rescheduling"
fi
for TEST in ${SUBTESTS}; do
# Run each subtest in a separate virtual environment to avoid conflicts
Expand Down
2 changes: 1 addition & 1 deletion qa/python_models/response_sender/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

class TritonPythonModel:
def initialize(self, args):
self._common = ResponseSenderModelCommon(pb_utils, args)
self._common = ResponseSenderModelCommon(pb_utils)

def execute(self, requests):
return self._common.execute(requests, use_async=False)
2 changes: 1 addition & 1 deletion qa/python_models/response_sender/model_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

class TritonPythonModel:
def initialize(self, args):
self._common = ResponseSenderModelCommon(pb_utils, args)
self._common = ResponseSenderModelCommon(pb_utils)

async def execute(self, requests):
return self._common.execute(requests, use_async=True)
20 changes: 1 addition & 19 deletions qa/python_models/response_sender/model_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,15 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
import json
import threading
import time

import numpy as np


class ResponseSenderModelCommon:
def __init__(self, pb_utils, args):
def __init__(self, pb_utils):
self._pb_utils = pb_utils
self._is_decoupled = pb_utils.using_decoupled_model_transaction_policy(
json.loads(args["model_config"])
)
self._background_tasks = set()

def _get_instructions_from_request(self, request):
Expand Down Expand Up @@ -123,20 +119,6 @@ def _send_responses(self, processed_requests, response_id_offset):
batch_size = request["batch_size"]
response_sender = request["response_sender"]
send_complete_final_flag = request["send_complete_final_flag"]

# TODO: gRPC frontend may segfault if non-decoupled model send response
# final flag separately from the response.
if (
not self._is_decoupled
and number_of_response == 1
and send_complete_final_flag
):
response_sender.send(
self._create_response(batch_size, response_id=response_id_offset),
flags=self._pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL,
)
continue

for response_id in range(number_of_response):
response_sender.send(
self._create_response(
Expand Down
18 changes: 5 additions & 13 deletions src/grpc/stream_infer_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ ModelStreamInferHandler::StreamInferResponseComplete(
LOG_VERBOSE(1) << "ModelStreamInferHandler::StreamInferComplete, context "
<< state->context_->unique_id_ << ", " << state->unique_id_
<< " step " << state->step_ << ", callback index "
<< state->cb_count_ << ", flags " << flags;
<< state->cb_count_ << ", flags " << flags
<< ", response is nullptr " << (iresponse == nullptr);

#ifdef TRITON_ENABLE_TRACING
if (state->cb_count_ == 1) {
Expand All @@ -573,19 +574,8 @@ ModelStreamInferHandler::StreamInferResponseComplete(
}
#endif // TRITON_ENABLE_TRACING

// Log appropriate errors
bool is_complete =
state->complete_ || (flags & TRITONSERVER_RESPONSE_COMPLETE_FINAL) != 0;
if (!state->is_decoupled_) {
if (!is_complete) {
LOG_ERROR << "[INTERNAL] ModelStreamInfer received a response without "
"FINAL flag for a model with one-to-one transaction";
}
if (iresponse == nullptr) {
LOG_ERROR << "[INTERNAL] ModelStreamInfer received a null response for a "
"model with one-to-one transaction";
}
}

// If receiving the final callback then erase the state from the inflight
// state data structure to prevent cancellation being called on the request.
Expand Down Expand Up @@ -745,7 +735,9 @@ ModelStreamInferHandler::StreamInferResponseComplete(
}
} else {
state->step_ = Steps::WRITEREADY;
state->context_->WriteResponseIfReady(state);
if (is_complete) {
state->context_->WriteResponseIfReady(state);
}
}

state->complete_ = is_complete;
Expand Down
Loading