Skip to content

Commit

Permalink
Decoupled Async Execute (#7062) (#7100)
Browse files Browse the repository at this point in the history
* Add async execute decoupled test

* Add decoupled bls async exec test

* Enhance test with different durations for concurrent executes
  • Loading branch information
kthui authored Apr 11, 2024
1 parent 2ced472 commit 3ae2edb
Show file tree
Hide file tree
Showing 7 changed files with 462 additions and 1 deletion.
161 changes: 161 additions & 0 deletions qa/L0_backend_python/async_execute/concurrency_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os
import time
import unittest

import numpy as np
import tritonclient.grpc as grpcclient


class ConcurrencyTest(unittest.TestCase):
def setUp(self):
# Initialize client
self._triton = grpcclient.InferenceServerClient("localhost:8001")

def _generate_streaming_callback_and_response_pair(self):
response = [] # [{"result": result, "error": error}, ...]

def callback(result, error):
response.append({"result": result, "error": error})

return callback, response

# Helper for testing concurrent execution
def _concurrent_execute_requests(self, model_name, batch_size, number_of_requests):
delay_secs = 4
shape = [batch_size, 1]
inputs = [grpcclient.InferInput("WAIT_SECONDS", shape, "FP32")]
inputs[0].set_data_from_numpy(np.full(shape, delay_secs, dtype=np.float32))

callback, response = self._generate_streaming_callback_and_response_pair()
self._triton.start_stream(callback)
for i in range(number_of_requests):
self._triton.async_stream_infer(model_name, inputs)

# 2s for sending requests for processing and 2s for returning results.
wait_secs = 2 + delay_secs + 2
time.sleep(wait_secs)
# Ensure the sleep is shorter than sequential processing delay.
sequential_min_delay = wait_secs * batch_size * number_of_requests
self.assertLessEqual(wait_secs, sequential_min_delay)

# If executed sequentially, the results are not available yet, so concurrent
# execution is observed from seeing the correct responses.
self.assertEqual(len(response), number_of_requests)
for res in response:
self.assertEqual(res["result"].as_numpy("DUMMY_OUT").shape[0], batch_size)
self.assertIsNone(res["error"])

self._triton.stop_stream()

# Test batched requests are executed concurrently
def test_concurrent_execute_single_request(self):
self._concurrent_execute_requests(
model_name="async_execute_decouple", batch_size=4, number_of_requests=1
)

# Test multiple requests are executed concurrently
def test_concurrent_execute_multi_request(self):
self._concurrent_execute_requests(
model_name="async_execute_decouple", batch_size=1, number_of_requests=4
)

# Test batched requests are executed concurrently via bls
def test_concurrent_execute_single_request_bls(self):
self._concurrent_execute_requests(
model_name="async_execute_decouple_bls", batch_size=4, number_of_requests=1
)

# Test multiple requests are executed concurrently via bls
def test_concurrent_execute_multi_request_bls(self):
self._concurrent_execute_requests(
model_name="async_execute_decouple_bls", batch_size=1, number_of_requests=4
)

# Test requests with a shorter duration should return first
def test_concurrent_execute_different_duration(self):
model_name = "async_execute_decouple"
callback, response = self._generate_streaming_callback_and_response_pair()
self._triton.start_stream(callback)

# Send 2 requests / delays
shape = [1, 1]
for delay_secs in [10, 2]:
inputs = [grpcclient.InferInput("WAIT_SECONDS", shape, "FP32")]
inputs[0].set_data_from_numpy(np.full(shape, delay_secs, dtype=np.float32))
self._triton.async_stream_infer(model_name, inputs)
time.sleep(2) # leave a gap after each inference
shape[0] += 1 # batch size to track request id

# The last request executes for 2 secs, leave an additional 2 secs for sending
# the request and 2 secs for receiving its response. Since 2 secs has elapsed
# after sending the request, wait for another 4 secs.
time.sleep(4)
# The response of the last request should be available by now, while the first
# request executes for 10 secs and only 8 secs has elapsed, so its response
# should not be available by now.
self.assertEqual(len(response), 1)
self.assertEqual(response[0]["result"].as_numpy("DUMMY_OUT").shape[0], 2)
self.assertIsNone(response[0]["error"])

# The first request executes for 10 secs, leave an additional 2 secs for sending
# the request and 2 secs for receiving its response. Since 8 secs has elapsed
# after sending the request, wait for another 6 secs.
time.sleep(6)
# The response of the first request should be available by now.
self.assertEqual(len(response), 2)
self.assertEqual(response[1]["result"].as_numpy("DUMMY_OUT").shape[0], 1)
self.assertIsNone(response[1]["error"])

self._triton.stop_stream()

# Test model exception handling
def test_model_raise_exception(self):
model_name = "async_execute_decouple"
delay_secs = -1 # model will raise exception
shape = [1, 1]
inputs = [grpcclient.InferInput("WAIT_SECONDS", shape, "FP32")]
inputs[0].set_data_from_numpy(np.full(shape, delay_secs, dtype=np.float32))

with open(os.environ["SERVER_LOG"]) as f:
server_log = f.read()
self.assertNotIn("ValueError: wait_secs cannot be negative", server_log)

callback, response = self._generate_streaming_callback_and_response_pair()
self._triton.start_stream(callback)
self._triton.async_stream_infer(model_name, inputs)
time.sleep(2)
self._triton.stop_stream()

with open(os.environ["SERVER_LOG"]) as f:
server_log = f.read()
self.assertIn("ValueError: wait_secs cannot be negative", server_log)


if __name__ == "__main__":
unittest.main()
71 changes: 71 additions & 0 deletions qa/L0_backend_python/async_execute/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/bin/bash
# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

source ../../common/util.sh

RET=0

#
# Test execution overlapping on the same instance
#
rm -rf models && mkdir models
mkdir -p models/async_execute_decouple/1 && \
cp ../../python_models/async_execute_decouple/model.py models/async_execute_decouple/1 && \
cp ../../python_models/async_execute_decouple/config.pbtxt models/async_execute_decouple
mkdir -p models/async_execute_decouple_bls/1 && \
cp ../../python_models/async_execute_decouple_bls/model.py models/async_execute_decouple_bls/1 && \
cp ../../python_models/async_execute_decouple_bls/config.pbtxt models/async_execute_decouple_bls

TEST_LOG="concurrency_test.log"
SERVER_LOG="concurrency_test.server.log"
SERVER_ARGS="--model-repository=${MODELDIR}/async_execute/models --backend-directory=${BACKEND_DIR} --log-verbose=1"

run_server
if [ "$SERVER_PID" == "0" ]; then
echo -e "\n***\n*** Failed to start $SERVER\n***"
cat $SERVER_LOG
exit 1
fi

set +e
SERVER_LOG=$SERVER_LOG python3 -m pytest --junitxml=concurrency_test.report.xml concurrency_test.py > $TEST_LOG 2>&1
if [ $? -ne 0 ]; then
echo -e "\n***\n*** async execute concurrency test FAILED\n***"
cat $TEST_LOG
RET=1
fi
set -e

kill $SERVER_PID
wait $SERVER_PID

if [ $RET -eq 1 ]; then
echo -e "\n***\n*** Async execute test FAILED\n***"
else
echo -e "\n***\n*** Async execute test Passed\n***"
fi
exit $RET
2 changes: 1 addition & 1 deletion qa/L0_backend_python/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ if [[ "$TEST_JETSON" == "0" ]]; then
# [DLIS-5970] Disable io tests for Windows since GPU Tensors are not supported
# [DLIS-6122] Disable model_control & request_rescheduling tests for Windows since they require load/unload
if [[ ${TEST_WINDOWS} == 0 ]]; then
SUBTESTS+=" variants io python_based_backends"
SUBTESTS+=" variants io python_based_backends async_execute"
fi

for TEST in ${SUBTESTS}; do
Expand Down
46 changes: 46 additions & 0 deletions qa/python_models/async_execute_decouple/config.pbtxt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

backend: "python"
max_batch_size: 8

input [
{
name: "WAIT_SECONDS"
data_type: TYPE_FP32
dims: [ 1 ]
}
]
output [
{
name: "DUMMY_OUT"
data_type: TYPE_FP32
dims: [ 1 ]
}
]

instance_group [{ kind: KIND_CPU }]
model_transaction_policy { decoupled: True }
77 changes: 77 additions & 0 deletions qa/python_models/async_execute_decouple/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio

import numpy as np
import triton_python_backend_utils as pb_utils


class TritonPythonModel:
async def execute(self, requests):
processed_requests = []
async_tasks = []
for request in requests:
wait_secs_tensors = pb_utils.get_input_tensor_by_name(
request, "WAIT_SECONDS"
).as_numpy()
for wait_secs_tensor in wait_secs_tensors:
wait_secs = wait_secs_tensor[0]
if wait_secs < 0:
self.raise_value_error(requests)
async_tasks.append(asyncio.create_task(asyncio.sleep(wait_secs)))
processed_requests.append(
{
"response_sender": request.get_response_sender(),
"batch_size": wait_secs_tensors.shape[0],
}
)

# This decoupled execute should be scheduled to run in the background
# concurrently with other instances of decoupled execute, as long as the event
# loop is not blocked.
await asyncio.gather(*async_tasks)

for p_req in processed_requests:
response_sender = p_req["response_sender"]
batch_size = p_req["batch_size"]

output_tensors = pb_utils.Tensor(
"DUMMY_OUT", np.array([0 for i in range(batch_size)], np.float32)
)
response = pb_utils.InferenceResponse(output_tensors=[output_tensors])
response_sender.send(
response, flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL
)

return None

def raise_value_error(self, requests):
# TODO: Model may raise exception without sending complete final
for request in requests:
response_sender = request.get_response_sender()
response_sender.send(flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
raise ValueError("wait_secs cannot be negative")
Loading

0 comments on commit 3ae2edb

Please sign in to comment.