Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tanmayv25 committed Sep 13, 2023
1 parent de2b4c8 commit ed214e8
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/python/library/tritonclient/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from ._infer_input import InferInput
from ._infer_result import InferResult
from ._requested_output import InferRequestedOutput
from ._utils import CancelledError, raise_error, raise_error_grpc
from ._utils import raise_error, raise_error_grpc
except ModuleNotFoundError as error:
raise RuntimeError(
"The installation does not include grpc support. "
Expand Down
18 changes: 5 additions & 13 deletions src/python/library/tritonclient/grpc/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
from ._infer_result import InferResult
from ._infer_stream import _InferStream, _RequestIterator
from ._utils import (
CancelledError,
_get_inference_request,
_grpc_compression_type,
get_cancelled_error,
get_error_grpc,
raise_error,
raise_error_grpc,
Expand Down Expand Up @@ -1392,13 +1392,10 @@ def async_infer(
callback : function
Python function that is invoked once the request is completed.
The function must reserve the last two arguments (result, error)
to hold InferResult and InferenceServerException(or CancelledError)
to hold InferResult and InferenceServerException
objects respectively which will be provided to the function when
executing the callback. The ownership of these objects will be given
to the user. The 'error' would be None for a successful inference.
Note if the request is cancelled using the returned future object,
error provided to callback will be a CancelledError exception
object.
model_version: str
The version of the model to run inference. The default value
is an empty string which means then the server will choose
Expand Down Expand Up @@ -1471,9 +1468,6 @@ def async_infer(
See here for more details of future object:
https://grpc.github.io/grpc/python/grpc.html#grpc.Future
The callback will be invoked with
(result=None, error=CancelledError) for the requests that
were successfully cancelled.
Raises
------
Expand All @@ -1490,8 +1484,8 @@ def wrapped_callback(call_future):
result = InferResult(response)
except grpc.RpcError as rpc_error:
error = get_error_grpc(rpc_error)
except grpc.FutureCancelledError:
error = CancelledError()
except grpc.FutureCancelledError as err:
error = get_cancelled_error()
callback(result=result, error=error)

metadata = self._get_metadata(headers)
Expand Down Expand Up @@ -1545,12 +1539,10 @@ def start_stream(
Python function that is invoked upon receiving response from
the underlying stream. The function must reserve the last two
arguments (result, error) to hold InferResult and
InferenceServerException(or CancelledError) objects respectively
InferenceServerException objects respectively
which will be provided to the function when executing the callback.
The ownership of these objects will be given to the user. The 'error'
would be None for a successful inference.
Note if the stream is closed with cancel_requests set True, then
the error provided to callback will be a CancelledError object.
stream_timeout : float
Optional stream timeout (in seconds). The stream will be closed
Expand Down
12 changes: 7 additions & 5 deletions src/python/library/tritonclient/grpc/_infer_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from tritonclient.utils import *

from ._infer_result import InferResult
from ._utils import CancelledError, get_error_grpc, raise_error
from ._utils import get_cancelled_error, get_error_grpc, raise_error


class _InferStream:
Expand Down Expand Up @@ -65,9 +65,11 @@ def __del__(self):
self.close()

def close(self, cancel_requests=False):
"""Gracefully close underlying gRPC streams. Note that this call
blocks till response of all currently enqueued requests are not
received.
"""Gracefully close underlying gRPC streams.
If cancel_requests is set True, then client cancels all the
pending requests and closes the stream. If set False, the
call blocks till all the pending requests on the stream are
processed.
"""
if cancel_requests and self._response_iterator:
self._response_iterator.cancel()
Expand Down Expand Up @@ -157,7 +159,7 @@ def _process_response(self):
# circular wait
self._active = self._response_iterator.is_active()
if rpc_error.cancelled:
error = CancelledError()
error = get_cancelled_error()
else:
error = get_error_grpc(rpc_error)
self._callback(result=None, error=error)
Expand Down
16 changes: 12 additions & 4 deletions src/python/library/tritonclient/grpc/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
from tritonclient.utils import *


class CancelledError(Exception):
"""Indicates that the issued operation was cancelled."""


def get_error_grpc(rpc_error):
"""Convert a gRPC error to an InferenceServerException.
Expand All @@ -54,6 +50,18 @@ def get_error_grpc(rpc_error):
)


def get_cancelled_error():
"""Get InferenceServerException object for a locally cancelled RPC.
Returns
-------
InferenceServerException
"""
return InferenceServerException(
msg="Locally cancelled by application!", status="StatusCode.CANCELLED"
)


def raise_error_grpc(rpc_error):
"""Raise an InferenceServerException from a gRPC error.
Expand Down
19 changes: 13 additions & 6 deletions src/python/library/tritonclient/grpc/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,10 +602,17 @@ async def infer(
cancellation if required. This can be attained
by following:
-------
call = await client.infer(..., get_call_obj=True)
call.cancel()
generator = client.infer(..., get_call_obj=True)
grpc_call = await anext(generator)
grpc_call.cancel()
-------
Returns
-------
async_generator
If get_call_obj is set True, then it generates the
streaming_call object before generating the inference
results.
"""

Expand Down Expand Up @@ -679,17 +686,17 @@ async def stream_infer(
The call object can be used to cancel the execution of the
ongoing stream and exit. This can be done like below:
-------
async_generator = await client.infer(..., get_call_obj=True)
streaming_call = await response_iterator.__next__()
async_generator = client.stream_infer(..., get_call_obj=True)
streaming_call = await anext(response_iterator)
streaming_call.cancel()
-------
Returns
-------
async_generator
Yield tuple holding (InferResult, InferenceServerException) objects.
If get_call_obj is set True, then it yields the streaming_call
object before yielding the tuples.
If get_call_obj is set True, then it first generates streaming_call
object associated with the call before generating these tuples.
Raises
------
Expand Down

0 comments on commit ed214e8

Please sign in to comment.