From a9f9c08867afe5d2f6bc6ebfc9423e152dcb4a23 Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Fri, 29 Mar 2024 10:06:25 -0400 Subject: [PATCH] Add testing for iterative scheduler backlogged requests --- .../iterative_sequence_e2e.py | 34 +++++++++++++------ .../models/iterative_sequence/config.pbtxt | 2 +- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/qa/L0_iterative_sequence/iterative_sequence_e2e.py b/qa/L0_iterative_sequence/iterative_sequence_e2e.py index 378b6ebe82..3676a2f6b3 100755 --- a/qa/L0_iterative_sequence/iterative_sequence_e2e.py +++ b/qa/L0_iterative_sequence/iterative_sequence_e2e.py @@ -46,7 +46,7 @@ MODEL_CONFIG_BASE = """ {{ "backend": "iterative_sequence", -"max_batch_size": 4, +"max_batch_size": 1, "input" : [ {{ "name": "INPUT", @@ -103,7 +103,9 @@ def test_generate_stream(self): self.assertEqual(res_count, data["OUTPUT"]) self.assertEqual(0, res_count) - def test_grpc_stream(self, sequence_id=0, sequence_start=False): + def test_grpc_stream( + self, sequence_id=0, sequence_start=False, num_requests=1, validation=True + ): user_data = UserData() with grpcclient.InferenceServerClient("localhost:8001") as triton_client: triton_client.start_stream(callback=partial(callback, user_data)) @@ -111,22 +113,34 @@ def test_grpc_stream(self, sequence_id=0, sequence_start=False): inputs.append(grpcclient.InferInput("INPUT", [1, 1], "INT32")) inputs[0].set_data_from_numpy(np.array([[2]], dtype=np.int32)) - triton_client.async_stream_infer( - model_name="iterative_sequence", - inputs=inputs, - sequence_id=sequence_id, - sequence_start=sequence_start, - ) - res_count = 2 + for _ in range(num_requests): + triton_client.async_stream_infer( + model_name="iterative_sequence", + inputs=inputs, + sequence_id=sequence_id, + sequence_start=sequence_start, + ) + res_count = 2 * num_requests while res_count > 0: data_item = user_data._completed_requests.get() res_count -= 1 if type(data_item) == InferenceServerException: raise data_item else: - self.assertEqual(res_count, data_item.as_numpy("OUTPUT")[0][0]) + if validation: + self.assertEqual( + res_count % 2, data_item.as_numpy("OUTPUT")[0][0] + ) self.assertEqual(0, res_count) + def test_backlog_fill(self): + config = r'"sequence_batching" : { "iterative_sequence" : true, "max_sequence_idle_microseconds": 8000000, direct: { "max_queue_delay_microseconds" : 10000000 }}' + with grpcclient.InferenceServerClient("localhost:8001") as triton_client: + triton_client.load_model( + "iterative_sequence", config=MODEL_CONFIG_BASE.format(config) + ) + self.test_grpc_stream(num_requests=4, validation=False) + def test_reschedule_error(self): # Use short idle timeout (< backend reschedule delay: 0.5s) so that # the backend won't be able to reschedule the request as the scheduler diff --git a/qa/L0_iterative_sequence/models/iterative_sequence/config.pbtxt b/qa/L0_iterative_sequence/models/iterative_sequence/config.pbtxt index d6e539007b..fbf8685291 100644 --- a/qa/L0_iterative_sequence/models/iterative_sequence/config.pbtxt +++ b/qa/L0_iterative_sequence/models/iterative_sequence/config.pbtxt @@ -24,7 +24,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. backend: "iterative_sequence" -max_batch_size: 4 +max_batch_size: 1 input [ { name: "INPUT"