Skip to content

Commit

Permalink
Allow multiple response per request. Add test for non-decoupled model…
Browse files Browse the repository at this point in the history
… metrics.
  • Loading branch information
yinggeh committed Oct 21, 2024
1 parent 1148209 commit 3c008c8
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 65 deletions.
74 changes: 74 additions & 0 deletions qa/L0_metrics/ensemble_decoupled/async_execute/1/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio

import numpy as np
import triton_python_backend_utils as pb_utils


class TritonPythonModel:
async def execute(self, requests):
processed_requests = []
async_tasks = []
for request in requests:
# Get input data
wait_secs_tensor = pb_utils.get_input_tensor_by_name(
request, "WAIT_SECONDS"
).as_numpy()

# Validate input data
wait_secs = wait_secs_tensor[0]
if wait_secs < 0:
self.raise_value_error(request, "wait_secs cannot be negative")
async_tasks.append(asyncio.create_task(asyncio.sleep(wait_secs)))
processed_requests.append(
{
"response_sender": request.get_response_sender(),
}
)

# This decoupled execute should be scheduled to run in the background
# concurrently with other instances of decoupled execute, as long as the event
# loop is not blocked.
await asyncio.gather(*async_tasks)

for p_req in processed_requests:
response_sender = p_req["response_sender"]

output_tensors = pb_utils.Tensor("DUMMY_OUT", np.array([0], np.float32))
response = pb_utils.InferenceResponse(output_tensors=[output_tensors])
response_sender.send(
response, flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL
)

return None

def raise_value_error(self, requests, msg):
for request in requests:
response_sender = request.get_response_sender()
response_sender.send(flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
raise ValueError(msg)
43 changes: 43 additions & 0 deletions qa/L0_metrics/ensemble_decoupled/async_execute/config.pbtxt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

backend: "python"
input [
{
name: "WAIT_SECONDS"
data_type: TYPE_FP32
dims: [ 1 ]
}
]
output [
{
name: "DUMMY_OUT"
data_type: TYPE_FP32
dims: [ 1 ]
}
]

instance_group [{ kind: KIND_CPU }]
39 changes: 27 additions & 12 deletions qa/L0_metrics/ensemble_decoupled/async_execute_decouple/1/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,27 @@ async def execute(self, requests):
processed_requests = []
async_tasks = []
for request in requests:
# Get input data
wait_secs_tensor = pb_utils.get_input_tensor_by_name(
request, "WAIT_SECONDS"
).as_numpy()
for wait_secs in wait_secs_tensor:
if wait_secs < 0:
self.raise_value_error(requests)
async_tasks.append(asyncio.create_task(asyncio.sleep(wait_secs)))
response_num_tensor = pb_utils.get_input_tensor_by_name(
request, "RESPONSE_NUM"
).as_numpy()

# Validate input data
wait_secs = wait_secs_tensor[0]
if wait_secs < 0:
self.raise_value_error(requests, "wait_secs cannot be negative")
response_num = response_num_tensor[0]
if response_num < 1:
self.raise_value_error(requests, "response_num cannot be less than one")
async_tasks.append(asyncio.create_task(asyncio.sleep(wait_secs)))

processed_requests.append(
{
"wait_secs": wait_secs,
"response_num": response_num,
"response_sender": request.get_response_sender(),
}
)
Expand All @@ -56,21 +67,25 @@ async def execute(self, requests):

for p_req in processed_requests:
wait_secs = p_req["wait_secs"]
response_num = p_req["response_num"]
response_sender = p_req["response_sender"]

output_tensors = pb_utils.Tensor(
"DUMMY_OUT", np.array([wait_secs], np.float32)
)
response = pb_utils.InferenceResponse(output_tensors=[output_tensors])
response_sender.send(
response, flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL
"WAIT_SECONDS", np.array([wait_secs], np.float32)
)
for i in range(response_num):
response = pb_utils.InferenceResponse(output_tensors=[output_tensors])
if i != response_num - 1:
response_sender.send(response)
else:
response_sender.send(
response, flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL
)

return None

def raise_value_error(self, requests):
# TODO: Model may raise exception without sending complete final
def raise_value_error(self, requests, msg):
for request in requests:
response_sender = request.get_response_sender()
response_sender.send(flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
raise ValueError("wait_secs cannot be negative")
raise ValueError(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ input [
name: "WAIT_SECONDS"
data_type: TYPE_FP32
dims: [ 1 ]
},
{
name: "RESPONSE_NUM"
data_type: TYPE_UINT8
dims: [ 1 ]
}
]
output [
{
name: "DUMMY_OUT"
name: "WAIT_SECONDS"
data_type: TYPE_FP32
dims: [ 1 ]
}
]

instance_group [{ kind: KIND_CPU }]
model_transaction_policy { decoupled: True }
15 changes: 12 additions & 3 deletions qa/L0_metrics/ensemble_decoupled/ensemble/config.pbtxt
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@ name: "ensemble"
platform: "ensemble"
input [
{
name: "INPUT"
name: "INPUT0"
data_type: TYPE_FP32
dims: [ 1 ]
},
{
name: "INPUT1"
data_type: TYPE_UINT8
dims: [ 1 ]
}
]
output [
Expand All @@ -48,10 +53,14 @@ ensemble_scheduling {
model_version: 1
input_map {
key: "WAIT_SECONDS"
value: "INPUT"
value: "INPUT0"
}
input_map {
key: "RESPONSE_NUM"
value: "INPUT1"
}
output_map {
key: "DUMMY_OUT"
key: "WAIT_SECONDS"
value: "temp_output"
}
},
Expand Down
108 changes: 64 additions & 44 deletions qa/L0_metrics/histogram_metrics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,66 +76,75 @@ def get_histogram_metrics(self, metric_family: str):
return histogram_dict

def async_stream_infer(self, model_name, inputs, outputs):
triton_client = grpcclient.InferenceServerClient(url="localhost:8001")

# Define the callback function. Note the last two parameters should be
# result and error. InferenceServerClient would povide the results of an
# inference as grpcclient.InferResult in result. For successful
# inference, error will be None, otherwise it will be an object of
# tritonclientutils.InferenceServerException holding the error details
def callback(user_data, result, error):
if error:
user_data.append(error)
else:
user_data.append(result)

# list to hold the results of inference.
user_data = []

# Inference call
triton_client.start_stream(callback=partial(callback, user_data))
triton_client.async_stream_infer(
model_name=model_name,
inputs=inputs,
outputs=outputs,
)
triton_client.stop_stream()

# Wait until the results are available in user_data
time_out = 10
while (len(user_data) == 0) and time_out > 0:
time_out = time_out - 1
time.sleep(1)

# Display and validate the available results
if len(user_data) == 1:
# Check for the errors
self.assertNotIsInstance(user_data[0], InferenceServerException)
with grpcclient.InferenceServerClient(url="localhost:8001") as triton_client:
# Define the callback function. Note the last two parameters should be
# result and error. InferenceServerClient would povide the results of an
# inference as grpcclient.InferResult in result. For successful
# inference, error will be None, otherwise it will be an object of
# tritonclientutils.InferenceServerException holding the error details
def callback(user_data, result, error):
if error:
user_data.append(error)
else:
user_data.append(result)

# list to hold the results of inference.
user_data = []

# Inference call
triton_client.start_stream(callback=partial(callback, user_data))
triton_client.async_stream_infer(
model_name=model_name,
inputs=inputs,
outputs=outputs,
)
triton_client.stop_stream()

# Wait until the results are available in user_data
time_out = 10
while (len(user_data) == 0) and time_out > 0:
time_out = time_out - 1
time.sleep(1)

# Validate the results
for i in range(len(user_data)):
# Check for the errors
self.assertNotIsInstance(
user_data[i], InferenceServerException, user_data[i]
)

def test_ensemble_decoupled(self):
ensemble_model_name = "ensemble"
wait_secs = 1
wait_secs = 0.1
responses_per_req = 3
total_reqs = 3

# Infer
inputs = []
outputs = []
inputs.append(grpcclient.InferInput("INPUT", [1], "FP32"))
inputs.append(grpcclient.InferInput("INPUT0", [1], "FP32"))
inputs.append(grpcclient.InferInput("INPUT1", [1], "UINT8"))
outputs.append(grpcclient.InferRequestedOutput("OUTPUT"))

# Create the data for the input tensor. Initialize to all ones.
input_data = np.ones(shape=(1), dtype=np.float32) * wait_secs
# Create the data for the input tensor.
input_data_0 = np.array([wait_secs], np.float32)
input_data_1 = np.array([responses_per_req], np.uint8)

# Initialize the data
inputs[0].set_data_from_numpy(input_data)
inputs[0].set_data_from_numpy(input_data_0)
inputs[1].set_data_from_numpy(input_data_1)

# Send 3 requests to ensemble decoupled model
for request_num in range(1, 4):
# Send requests to ensemble decoupled model
for request_num in range(1, total_reqs + 1):
ensemble_model_name = "ensemble"
decoupled_model_name = "async_execute_decouple"
non_decoupled_model_name = "async_execute"
self.async_stream_infer(ensemble_model_name, inputs, outputs)

# Checks metrics output
first_response_family = "nv_inference_first_response_histogram_ms"
decoupled_model_name = "async_execute_decouple"
histogram_dict = self.get_histogram_metrics(first_response_family)

# Test ensemble model metrics
ensemble_model_count = get_histogram_metric_key(
first_response_family, ensemble_model_name, "1", "count"
)
Expand All @@ -150,6 +159,7 @@ def test_ensemble_decoupled(self):
2 * wait_secs * MILLIS_PER_SEC * request_num,
)

# Test decoupled model metrics
decoupled_model_count = get_histogram_metric_key(
first_response_family, decoupled_model_name, "1", "count"
)
Expand All @@ -164,6 +174,16 @@ def test_ensemble_decoupled(self):
wait_secs * MILLIS_PER_SEC * request_num,
)

# Test non-decoupled model metrics
non_decoupled_model_count = get_histogram_metric_key(
first_response_family, non_decoupled_model_name, "1", "count"
)
non_decoupled_model_sum = get_histogram_metric_key(
first_response_family, non_decoupled_model_name, "1", "sum"
)
self.assertNotIn(non_decoupled_model_count, histogram_dict)
self.assertNotIn(non_decoupled_model_sum, histogram_dict)


if __name__ == "__main__":
unittest.main()
Loading

0 comments on commit 3c008c8

Please sign in to comment.