From 3c008c8fe024b08ae057a1f29cde18415b91524a Mon Sep 17 00:00:00 2001 From: Yingge He Date: Mon, 21 Oct 2024 01:43:07 -0700 Subject: [PATCH] Allow multiple response per request. Add test for non-decoupled model metrics. --- .../async_execute/1/model.py | 74 ++++++++++++ .../async_execute/config.pbtxt | 43 +++++++ .../async_execute_decouple/1/model.py | 39 +++++-- .../async_execute_decouple/config.pbtxt | 8 +- .../ensemble_decoupled/ensemble/config.pbtxt | 15 ++- qa/L0_metrics/histogram_metrics_test.py | 108 +++++++++++------- qa/L0_metrics/test.sh | 5 +- 7 files changed, 227 insertions(+), 65 deletions(-) create mode 100644 qa/L0_metrics/ensemble_decoupled/async_execute/1/model.py create mode 100644 qa/L0_metrics/ensemble_decoupled/async_execute/config.pbtxt diff --git a/qa/L0_metrics/ensemble_decoupled/async_execute/1/model.py b/qa/L0_metrics/ensemble_decoupled/async_execute/1/model.py new file mode 100644 index 0000000000..4b00b2abfb --- /dev/null +++ b/qa/L0_metrics/ensemble_decoupled/async_execute/1/model.py @@ -0,0 +1,74 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import asyncio + +import numpy as np +import triton_python_backend_utils as pb_utils + + +class TritonPythonModel: + async def execute(self, requests): + processed_requests = [] + async_tasks = [] + for request in requests: + # Get input data + wait_secs_tensor = pb_utils.get_input_tensor_by_name( + request, "WAIT_SECONDS" + ).as_numpy() + + # Validate input data + wait_secs = wait_secs_tensor[0] + if wait_secs < 0: + self.raise_value_error(request, "wait_secs cannot be negative") + async_tasks.append(asyncio.create_task(asyncio.sleep(wait_secs))) + processed_requests.append( + { + "response_sender": request.get_response_sender(), + } + ) + + # This decoupled execute should be scheduled to run in the background + # concurrently with other instances of decoupled execute, as long as the event + # loop is not blocked. + await asyncio.gather(*async_tasks) + + for p_req in processed_requests: + response_sender = p_req["response_sender"] + + output_tensors = pb_utils.Tensor("DUMMY_OUT", np.array([0], np.float32)) + response = pb_utils.InferenceResponse(output_tensors=[output_tensors]) + response_sender.send( + response, flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL + ) + + return None + + def raise_value_error(self, requests, msg): + for request in requests: + response_sender = request.get_response_sender() + response_sender.send(flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL) + raise ValueError(msg) diff --git a/qa/L0_metrics/ensemble_decoupled/async_execute/config.pbtxt b/qa/L0_metrics/ensemble_decoupled/async_execute/config.pbtxt new file mode 100644 index 0000000000..921fb9da0a --- /dev/null +++ b/qa/L0_metrics/ensemble_decoupled/async_execute/config.pbtxt @@ -0,0 +1,43 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +backend: "python" +input [ + { + name: "WAIT_SECONDS" + data_type: TYPE_FP32 + dims: [ 1 ] + } +] +output [ + { + name: "DUMMY_OUT" + data_type: TYPE_FP32 + dims: [ 1 ] + } +] + +instance_group [{ kind: KIND_CPU }] diff --git a/qa/L0_metrics/ensemble_decoupled/async_execute_decouple/1/model.py b/qa/L0_metrics/ensemble_decoupled/async_execute_decouple/1/model.py index 5743768ae9..9a022bf3d7 100644 --- a/qa/L0_metrics/ensemble_decoupled/async_execute_decouple/1/model.py +++ b/qa/L0_metrics/ensemble_decoupled/async_execute_decouple/1/model.py @@ -35,16 +35,27 @@ async def execute(self, requests): processed_requests = [] async_tasks = [] for request in requests: + # Get input data wait_secs_tensor = pb_utils.get_input_tensor_by_name( request, "WAIT_SECONDS" ).as_numpy() - for wait_secs in wait_secs_tensor: - if wait_secs < 0: - self.raise_value_error(requests) - async_tasks.append(asyncio.create_task(asyncio.sleep(wait_secs))) + response_num_tensor = pb_utils.get_input_tensor_by_name( + request, "RESPONSE_NUM" + ).as_numpy() + + # Validate input data + wait_secs = wait_secs_tensor[0] + if wait_secs < 0: + self.raise_value_error(requests, "wait_secs cannot be negative") + response_num = response_num_tensor[0] + if response_num < 1: + self.raise_value_error(requests, "response_num cannot be less than one") + async_tasks.append(asyncio.create_task(asyncio.sleep(wait_secs))) + processed_requests.append( { "wait_secs": wait_secs, + "response_num": response_num, "response_sender": request.get_response_sender(), } ) @@ -56,21 +67,25 @@ async def execute(self, requests): for p_req in processed_requests: wait_secs = p_req["wait_secs"] + response_num = p_req["response_num"] response_sender = p_req["response_sender"] output_tensors = pb_utils.Tensor( - "DUMMY_OUT", np.array([wait_secs], np.float32) - ) - response = pb_utils.InferenceResponse(output_tensors=[output_tensors]) - response_sender.send( - response, flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL + "WAIT_SECONDS", np.array([wait_secs], np.float32) ) + for i in range(response_num): + response = pb_utils.InferenceResponse(output_tensors=[output_tensors]) + if i != response_num - 1: + response_sender.send(response) + else: + response_sender.send( + response, flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL + ) return None - def raise_value_error(self, requests): - # TODO: Model may raise exception without sending complete final + def raise_value_error(self, requests, msg): for request in requests: response_sender = request.get_response_sender() response_sender.send(flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL) - raise ValueError("wait_secs cannot be negative") + raise ValueError(msg) diff --git a/qa/L0_metrics/ensemble_decoupled/async_execute_decouple/config.pbtxt b/qa/L0_metrics/ensemble_decoupled/async_execute_decouple/config.pbtxt index c0cf23eafa..af1e941e41 100644 --- a/qa/L0_metrics/ensemble_decoupled/async_execute_decouple/config.pbtxt +++ b/qa/L0_metrics/ensemble_decoupled/async_execute_decouple/config.pbtxt @@ -30,11 +30,16 @@ input [ name: "WAIT_SECONDS" data_type: TYPE_FP32 dims: [ 1 ] + }, + { + name: "RESPONSE_NUM" + data_type: TYPE_UINT8 + dims: [ 1 ] } ] output [ { - name: "DUMMY_OUT" + name: "WAIT_SECONDS" data_type: TYPE_FP32 dims: [ 1 ] } @@ -42,4 +47,3 @@ output [ instance_group [{ kind: KIND_CPU }] model_transaction_policy { decoupled: True } - diff --git a/qa/L0_metrics/ensemble_decoupled/ensemble/config.pbtxt b/qa/L0_metrics/ensemble_decoupled/ensemble/config.pbtxt index 83eda3807b..16e2c9b13a 100644 --- a/qa/L0_metrics/ensemble_decoupled/ensemble/config.pbtxt +++ b/qa/L0_metrics/ensemble_decoupled/ensemble/config.pbtxt @@ -28,9 +28,14 @@ name: "ensemble" platform: "ensemble" input [ { - name: "INPUT" + name: "INPUT0" data_type: TYPE_FP32 dims: [ 1 ] + }, + { + name: "INPUT1" + data_type: TYPE_UINT8 + dims: [ 1 ] } ] output [ @@ -48,10 +53,14 @@ ensemble_scheduling { model_version: 1 input_map { key: "WAIT_SECONDS" - value: "INPUT" + value: "INPUT0" + } + input_map { + key: "RESPONSE_NUM" + value: "INPUT1" } output_map { - key: "DUMMY_OUT" + key: "WAIT_SECONDS" value: "temp_output" } }, diff --git a/qa/L0_metrics/histogram_metrics_test.py b/qa/L0_metrics/histogram_metrics_test.py index 1ff3074671..2eb7c0d8b4 100755 --- a/qa/L0_metrics/histogram_metrics_test.py +++ b/qa/L0_metrics/histogram_metrics_test.py @@ -76,66 +76,75 @@ def get_histogram_metrics(self, metric_family: str): return histogram_dict def async_stream_infer(self, model_name, inputs, outputs): - triton_client = grpcclient.InferenceServerClient(url="localhost:8001") - - # Define the callback function. Note the last two parameters should be - # result and error. InferenceServerClient would povide the results of an - # inference as grpcclient.InferResult in result. For successful - # inference, error will be None, otherwise it will be an object of - # tritonclientutils.InferenceServerException holding the error details - def callback(user_data, result, error): - if error: - user_data.append(error) - else: - user_data.append(result) - - # list to hold the results of inference. - user_data = [] - - # Inference call - triton_client.start_stream(callback=partial(callback, user_data)) - triton_client.async_stream_infer( - model_name=model_name, - inputs=inputs, - outputs=outputs, - ) - triton_client.stop_stream() - - # Wait until the results are available in user_data - time_out = 10 - while (len(user_data) == 0) and time_out > 0: - time_out = time_out - 1 - time.sleep(1) - - # Display and validate the available results - if len(user_data) == 1: - # Check for the errors - self.assertNotIsInstance(user_data[0], InferenceServerException) + with grpcclient.InferenceServerClient(url="localhost:8001") as triton_client: + # Define the callback function. Note the last two parameters should be + # result and error. InferenceServerClient would povide the results of an + # inference as grpcclient.InferResult in result. For successful + # inference, error will be None, otherwise it will be an object of + # tritonclientutils.InferenceServerException holding the error details + def callback(user_data, result, error): + if error: + user_data.append(error) + else: + user_data.append(result) + + # list to hold the results of inference. + user_data = [] + + # Inference call + triton_client.start_stream(callback=partial(callback, user_data)) + triton_client.async_stream_infer( + model_name=model_name, + inputs=inputs, + outputs=outputs, + ) + triton_client.stop_stream() + + # Wait until the results are available in user_data + time_out = 10 + while (len(user_data) == 0) and time_out > 0: + time_out = time_out - 1 + time.sleep(1) + + # Validate the results + for i in range(len(user_data)): + # Check for the errors + self.assertNotIsInstance( + user_data[i], InferenceServerException, user_data[i] + ) def test_ensemble_decoupled(self): - ensemble_model_name = "ensemble" - wait_secs = 1 + wait_secs = 0.1 + responses_per_req = 3 + total_reqs = 3 # Infer inputs = [] outputs = [] - inputs.append(grpcclient.InferInput("INPUT", [1], "FP32")) + inputs.append(grpcclient.InferInput("INPUT0", [1], "FP32")) + inputs.append(grpcclient.InferInput("INPUT1", [1], "UINT8")) outputs.append(grpcclient.InferRequestedOutput("OUTPUT")) - # Create the data for the input tensor. Initialize to all ones. - input_data = np.ones(shape=(1), dtype=np.float32) * wait_secs + # Create the data for the input tensor. + input_data_0 = np.array([wait_secs], np.float32) + input_data_1 = np.array([responses_per_req], np.uint8) + # Initialize the data - inputs[0].set_data_from_numpy(input_data) + inputs[0].set_data_from_numpy(input_data_0) + inputs[1].set_data_from_numpy(input_data_1) - # Send 3 requests to ensemble decoupled model - for request_num in range(1, 4): + # Send requests to ensemble decoupled model + for request_num in range(1, total_reqs + 1): + ensemble_model_name = "ensemble" + decoupled_model_name = "async_execute_decouple" + non_decoupled_model_name = "async_execute" self.async_stream_infer(ensemble_model_name, inputs, outputs) # Checks metrics output first_response_family = "nv_inference_first_response_histogram_ms" - decoupled_model_name = "async_execute_decouple" histogram_dict = self.get_histogram_metrics(first_response_family) + # Test ensemble model metrics ensemble_model_count = get_histogram_metric_key( first_response_family, ensemble_model_name, "1", "count" ) @@ -150,6 +159,7 @@ def test_ensemble_decoupled(self): 2 * wait_secs * MILLIS_PER_SEC * request_num, ) + # Test decoupled model metrics decoupled_model_count = get_histogram_metric_key( first_response_family, decoupled_model_name, "1", "count" ) @@ -164,6 +174,16 @@ def test_ensemble_decoupled(self): wait_secs * MILLIS_PER_SEC * request_num, ) + # Test non-decoupled model metrics + non_decoupled_model_count = get_histogram_metric_key( + first_response_family, non_decoupled_model_name, "1", "count" + ) + non_decoupled_model_sum = get_histogram_metric_key( + first_response_family, non_decoupled_model_name, "1", "sum" + ) + self.assertNotIn(non_decoupled_model_count, histogram_dict) + self.assertNotIn(non_decoupled_model_sum, histogram_dict) + if __name__ == "__main__": unittest.main() diff --git a/qa/L0_metrics/test.sh b/qa/L0_metrics/test.sh index da86993a8d..230b593482 100755 --- a/qa/L0_metrics/test.sh +++ b/qa/L0_metrics/test.sh @@ -458,13 +458,11 @@ kill_server expected_tests=6 check_unit_test "${expected_tests}" -# Test histogram data in ensemble decoupled model +### Test histogram data in ensemble decoupled model ### MODELDIR="${PWD}/ensemble_decoupled" SERVER_ARGS="--model-repository=${MODELDIR} --metrics-config histogram_latencies=true --log-verbose=1" PYTHON_TEST="histogram_metrics_test.py" mkdir -p "${MODELDIR}"/ensemble/1 -cp -r "${MODELDIR}"/async_execute_decouple "${MODELDIR}"/async_execute -sed -i "s/model_transaction_policy { decoupled: True }//" "${MODELDIR}"/async_execute/config.pbtxt run_and_check_server python3 ${PYTHON_TEST} 2>&1 | tee ${CLIENT_LOG} @@ -478,4 +476,3 @@ else fi exit $RET -