Skip to content

Commit

Permalink
Add test for improper response sending from model (#7292)
Browse files Browse the repository at this point in the history
  • Loading branch information
kthui authored Jun 4, 2024
1 parent d06dca2 commit 740ef3e
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 40 deletions.
3 changes: 2 additions & 1 deletion qa/L0_backend_python/bls/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ if [[ ${TEST_WINDOWS} == 0 ]]; then
echo "instance_group [ { kind: KIND_CPU} ]" >> models/libtorch_cpu/config.pbtxt

# Test with different sizes of CUDA memory pool
for CUDA_MEMORY_POOL_SIZE_MB in 64 128 ; do
# TODO: Why 256 worked in place of 128, on decoupled data pipeline?
for CUDA_MEMORY_POOL_SIZE_MB in 64 256 ; do
CUDA_MEMORY_POOL_SIZE_BYTES=$((CUDA_MEMORY_POOL_SIZE_MB * 1024 * 1024))
SERVER_ARGS="--model-repository=${MODELDIR}/bls/models --backend-directory=${BACKEND_DIR} --log-verbose=1 --cuda-memory-pool-byte-size=0:${CUDA_MEMORY_POOL_SIZE_BYTES}"
for TRIAL in non_decoupled decoupled ; do
Expand Down
175 changes: 171 additions & 4 deletions qa/L0_backend_python/response_sender/response_sender_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ class ResponseSenderTest(unittest.TestCase):
"number_of_response_after_return": 0,
"send_complete_final_flag_after_return": False,
}
_inputs_parameters_one_response_pre_and_on_return = {
"number_of_response_before_return": 1,
"send_complete_final_flag_before_return": True,
"return_a_response": True,
"number_of_response_after_return": 0,
"send_complete_final_flag_after_return": False,
}
_inputs_parameters_one_response_on_and_post_return = {
"number_of_response_before_return": 0,
"send_complete_final_flag_before_return": False,
"return_a_response": True,
"number_of_response_after_return": 1,
"send_complete_final_flag_after_return": True,
}

def _get_inputs(
self,
Expand Down Expand Up @@ -195,6 +209,8 @@ def _assert_responses_exception(self, responses, expected_message):
self.assertIsNone(response["result"])
self.assertIsInstance(response["error"], InferenceServerException)
self.assertIn(expected_message, response["error"].message())
# There may be more responses, but currently only sees one for all tests.
self.assertEqual(len(responses), 1)

def _assert_decoupled_infer_success(
self,
Expand Down Expand Up @@ -236,13 +252,16 @@ def _assert_decoupled_infer_success(
number_of_response_after_return,
)

def _assert_non_decoupled_infer_success(
def _assert_non_decoupled_infer_with_expected_response_success(
self,
number_of_response_before_return,
send_complete_final_flag_before_return,
return_a_response,
number_of_response_after_return,
send_complete_final_flag_after_return,
expected_number_of_response_before_return,
expected_return_a_response,
expected_number_of_response_after_return,
):
model_name = "response_sender"
responses = self._infer(
Expand All @@ -255,9 +274,9 @@ def _assert_non_decoupled_infer_success(
)
self._assert_responses_valid(
responses,
number_of_response_before_return,
return_a_response,
number_of_response_after_return,
expected_number_of_response_before_return,
expected_return_a_response,
expected_number_of_response_after_return,
)
# Do NOT group into a for-loop as it hides which model failed.
model_name = "response_sender_async"
Expand All @@ -271,9 +290,28 @@ def _assert_non_decoupled_infer_success(
)
self._assert_responses_valid(
responses,
expected_number_of_response_before_return,
expected_return_a_response,
expected_number_of_response_after_return,
)

def _assert_non_decoupled_infer_success(
self,
number_of_response_before_return,
send_complete_final_flag_before_return,
return_a_response,
number_of_response_after_return,
send_complete_final_flag_after_return,
):
self._assert_non_decoupled_infer_with_expected_response_success(
number_of_response_before_return,
send_complete_final_flag_before_return,
return_a_response,
number_of_response_after_return,
send_complete_final_flag_after_return,
expected_number_of_response_before_return=number_of_response_before_return,
expected_return_a_response=return_a_response,
expected_number_of_response_after_return=number_of_response_after_return,
)

# Decoupled model send response final flag before request return.
Expand Down Expand Up @@ -411,6 +449,135 @@ def test_decoupled_one_response_on_return(self):
# TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback'
# using `py_future.result()` with error hangs on exit.

# Decoupled model send 1 response and return 1 response.
def test_decoupled_one_response_pre_and_on_return(self):
# Note: The before return response will send a valid response and close the
# response sender. Then, returning a response will generate an error, but
# since the response sender is closed, nothing is passed to the client.
responses = self._infer(
model_name="response_sender_decoupled",
**self._inputs_parameters_one_response_pre_and_on_return,
)
self._assert_responses_valid(
responses,
number_of_response_before_return=1,
return_a_response=0,
number_of_response_after_return=0,
)
# TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback'
# using `py_future.result()` with error hangs on exit.

# Decoupled model return 1 response and send 1 response.
def test_decoupled_one_response_on_and_post_return(self):
# Note: The returned response will send an error response and complete final
# flag, and close the response sender and factory. Then, sending a
# response will raise an exception. Since the exception happens after the
# model returns, it cannot be caught by the stub (i.e. in a daemon
# thread), so nothing will happen.
responses = self._infer(
model_name="response_sender_decoupled",
**self._inputs_parameters_one_response_on_and_post_return,
)
self._assert_responses_exception(
responses,
expected_message="using the decoupled mode and the execute function must return None",
)
# TODO: Test for async decoupled after fixing 'AsyncEventFutureDoneCallback'
# using `py_future.result()` with error hangs on exit.

# Non-decoupled model send response final flag before request return.
def test_non_decoupled_zero_response_pre_return(self):
# Note: The final flag will raise an exception which stops the model. Since the
# exception happens before the model returns, it will be caught by the
# stub process which pass it to the backend and sent an error response
# with final flag.
expected_message = (
"Non-decoupled model cannot send complete final before sending a response"
)
model_name = "response_sender"
responses = self._infer(
model_name,
**self._inputs_parameters_zero_response_pre_return,
)
self._assert_responses_exception(responses, expected_message)
# Do NOT group into a for-loop as it hides which model failed.
model_name = "response_sender_async"
responses = self._infer(
model_name,
**self._inputs_parameters_zero_response_pre_return,
)
self._assert_responses_exception(responses, expected_message)

# Non-decoupled model send response final flag after request return.
@unittest.skip("Model unload will hang, see the TODO comment.")
def test_non_decoupled_zero_response_post_return(self):
# Note: The final flag will raise an exception which stops the model. Since the
# exception happens after the model returns, it cannot be caught by the
# stub (i.e. in a daemon thread), so nothing will happen.
# TODO: Since the stub does not know if the model failed after returning, the
# complete final flag is not sent and will hang when unloading the model.
# How to detect such event and close the response factory?
raise NotImplementedError("No testing is performed")

# Non-decoupled model send 2 response before return.
def test_non_decoupled_two_response_pre_return(self):
# Note: The 1st response will make its way to the client, but sending the 2nd
# response will raise an exception which stops the model. Since the
# exception happens before the model returns, it will be caught by the
# stub process which pass it to the backend and sent an error response
# with final flag. Since this is non-decoupled model using gRPC stream,
# any response after the 1st will be discarded by the frontend.
self._assert_non_decoupled_infer_with_expected_response_success(
**self._inputs_parameters_two_response_pre_return,
expected_number_of_response_before_return=1,
expected_return_a_response=False,
expected_number_of_response_after_return=0,
)

# Non-decoupled model send 2 response after return.
@unittest.skip("Model unload will hang, see the TODO comment.")
def test_non_decoupled_two_response_post_return(self):
# Note: The 1st response will make its way to the client, but sending the 2nd
# response will raise an exception which stops the model. Since the
# exception happens after the model returns, it cannot be caught by the
# stub (i.e. in a daemon thread), so nothing will happen.
# TODO: Since the stub does not know if the model failed after returning, the
# complete final flag is not sent and will hang when unloading the model.
# How to detect such event and close the response factory?
self._assert_non_decoupled_infer_with_expected_response_success(
**self._inputs_parameters_two_response_post_return,
expected_number_of_response_before_return=0,
expected_return_a_response=False,
expected_number_of_response_after_return=1,
)

# Non-decoupled model send 1 response and return 1 response.
def test_non_decoupled_one_response_pre_and_on_return(self):
# Note: The sent response will make its way to the client and complete final.
# The returned response will see the response sender is closed and raise
# an exception. The backend should see the request is closed and do
# nothing upon receiving the error from stub.
self._assert_non_decoupled_infer_with_expected_response_success(
**self._inputs_parameters_one_response_pre_and_on_return,
expected_number_of_response_before_return=1,
expected_return_a_response=False,
expected_number_of_response_after_return=0,
)

# Non-decoupled model return 1 response and send 1 response.
def test_non_decoupled_one_response_on_and_post_return(self):
# Note: The returned response will send the response to the client and complete
# final. The sent response will see the response sender is closed and
# raise an exception. Since the exception happens after the model returns,
# it cannot be caught by the stub (i.e. in a daemon thread), so nothing
# will happen.
self._assert_non_decoupled_infer_with_expected_response_success(
**self._inputs_parameters_one_response_on_and_post_return,
expected_number_of_response_before_return=0,
expected_return_a_response=True,
expected_number_of_response_after_return=0,
)


if __name__ == "__main__":
unittest.main()
3 changes: 2 additions & 1 deletion qa/L0_backend_python/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ SUBTESTS="lifecycle argument_validation logging custom_metrics"
# [DLIS-6122] Disable model_control & request_rescheduling tests for Windows since they require load/unload
# [DLIS-6123] Disable examples test for Windows since it requires updates to the example clients
if [[ ${TEST_WINDOWS} == 0 ]]; then
SUBTESTS+=" restart model_control examples request_rescheduling"
# TODO: Reimplement restart on decoupled data pipeline and enable restart.
SUBTESTS+=" model_control examples request_rescheduling"
fi
for TEST in ${SUBTESTS}; do
# Run each subtest in a separate virtual environment to avoid conflicts
Expand Down
2 changes: 1 addition & 1 deletion qa/python_models/response_sender/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

class TritonPythonModel:
def initialize(self, args):
self._common = ResponseSenderModelCommon(pb_utils, args)
self._common = ResponseSenderModelCommon(pb_utils)

def execute(self, requests):
return self._common.execute(requests, use_async=False)
2 changes: 1 addition & 1 deletion qa/python_models/response_sender/model_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

class TritonPythonModel:
def initialize(self, args):
self._common = ResponseSenderModelCommon(pb_utils, args)
self._common = ResponseSenderModelCommon(pb_utils)

async def execute(self, requests):
return self._common.execute(requests, use_async=True)
20 changes: 1 addition & 19 deletions qa/python_models/response_sender/model_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,15 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
import json
import threading
import time

import numpy as np


class ResponseSenderModelCommon:
def __init__(self, pb_utils, args):
def __init__(self, pb_utils):
self._pb_utils = pb_utils
self._is_decoupled = pb_utils.using_decoupled_model_transaction_policy(
json.loads(args["model_config"])
)
self._background_tasks = set()

def _get_instructions_from_request(self, request):
Expand Down Expand Up @@ -123,20 +119,6 @@ def _send_responses(self, processed_requests, response_id_offset):
batch_size = request["batch_size"]
response_sender = request["response_sender"]
send_complete_final_flag = request["send_complete_final_flag"]

# TODO: gRPC frontend may segfault if non-decoupled model send response
# final flag separately from the response.
if (
not self._is_decoupled
and number_of_response == 1
and send_complete_final_flag
):
response_sender.send(
self._create_response(batch_size, response_id=response_id_offset),
flags=self._pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL,
)
continue

for response_id in range(number_of_response):
response_sender.send(
self._create_response(
Expand Down
18 changes: 5 additions & 13 deletions src/grpc/stream_infer_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ ModelStreamInferHandler::StreamInferResponseComplete(
LOG_VERBOSE(1) << "ModelStreamInferHandler::StreamInferComplete, context "
<< state->context_->unique_id_ << ", " << state->unique_id_
<< " step " << state->step_ << ", callback index "
<< state->cb_count_ << ", flags " << flags;
<< state->cb_count_ << ", flags " << flags
<< ", response is nullptr " << (iresponse == nullptr);

#ifdef TRITON_ENABLE_TRACING
if (state->cb_count_ == 1) {
Expand All @@ -573,19 +574,8 @@ ModelStreamInferHandler::StreamInferResponseComplete(
}
#endif // TRITON_ENABLE_TRACING

// Log appropriate errors
bool is_complete =
state->complete_ || (flags & TRITONSERVER_RESPONSE_COMPLETE_FINAL) != 0;
if (!state->is_decoupled_) {
if (!is_complete) {
LOG_ERROR << "[INTERNAL] ModelStreamInfer received a response without "
"FINAL flag for a model with one-to-one transaction";
}
if (iresponse == nullptr) {
LOG_ERROR << "[INTERNAL] ModelStreamInfer received a null response for a "
"model with one-to-one transaction";
}
}

// If receiving the final callback then erase the state from the inflight
// state data structure to prevent cancellation being called on the request.
Expand Down Expand Up @@ -745,7 +735,9 @@ ModelStreamInferHandler::StreamInferResponseComplete(
}
} else {
state->step_ = Steps::WRITEREADY;
state->context_->WriteResponseIfReady(state);
if (is_complete) {
state->context_->WriteResponseIfReady(state);
}
}

state->complete_ = is_complete;
Expand Down

0 comments on commit 740ef3e

Please sign in to comment.