Skip to content

Commit

Permalink
Use gRPC AyncIO API for better performance
Browse files Browse the repository at this point in the history
  • Loading branch information
mocsharp committed Feb 15, 2025
1 parent 319eb88 commit be84362
Show file tree
Hide file tree
Showing 19 changed files with 437 additions and 165 deletions.
79 changes: 77 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,67 @@
"order": 1
}
},
{
"name": "(debugpy) grpc_endoscopy_tool_tracking/python (cloud)",
"type": "debugpy",
"request": "launch",
"preLaunchTask": "Build grpc_endoscopy_tool_tracking",
"program": "${workspaceFolder}/applications/distributed/grpc/grpc_endoscopy_tool_tracking/python/cloud/app_cloud_main.py",
"args": [
"--data",
"${env:HOLOHUB_DATA_DIR}/endoscopy"
],
"cwd": "${workspaceFolder}/build/grpc_endoscopy_tool_tracking/",
"env": {
"PYTHONPATH": "${workspaceFolder}/build/grpc_endoscopy_tool_tracking/python/lib:${workspaceFolder}:${env:PYTHONPATH}"
},
"presentation": {
"group": "grpc_endoscopy_tool_tracking",
"hidden": true
}
},
{
"name": "(pythoncpp) grpc_endoscopy_tool_tracking/python (cloud)",
"type": "pythoncpp",
"request": "launch",
"pythonLaunchName": "(debugpy) grpc_endoscopy_tool_tracking/python (cloud)",
"cppConfig": "default (gdb) Attach",
"presentation": {
"hidden": false,
"group": "grpc_endoscopy_tool_tracking",
"order": 2
}
},
{
"name": "(debugpy) grpc_endoscopy_tool_tracking/python (edge)",
"type": "debugpy",
"request": "launch",
"preLaunchTask": "Build grpc_endoscopy_tool_tracking (delay 3s)",
"program": "${workspaceFolder}/applications/distributed/grpc/grpc_endoscopy_tool_tracking/python/edge/app_edge_main.py",
"args": [
"--data",
"${env:HOLOHUB_DATA_DIR}/endoscopy"
],
"cwd": "${workspaceFolder}/build/grpc_endoscopy_tool_tracking/",
"env": {
"PYTHONPATH": "${workspaceFolder}/build/grpc_endoscopy_tool_tracking/python/lib:${workspaceFolder}:${env:PYTHONPATH}"
},
"presentation": {
"hidden": true
}
},
{
"name": "(pythoncpp) grpc_endoscopy_tool_tracking/python (edge)",
"type": "pythoncpp",
"request": "launch",
"pythonLaunchName": "(debugpy) grpc_endoscopy_tool_tracking/python (edge)",
"cppConfig": "default (gdb) Attach",
"presentation": {
"hidden": false,
"group": "grpc_endoscopy_tool_tracking",
"order": 2
}
},
//#endregion grpc_endoscopy_tool_tracking

//#region grpc_h264_endoscopy_tool_tracking
Expand Down Expand Up @@ -1308,7 +1369,21 @@
"presentation": {
"hidden": false,
"group": "grpc_endoscopy_tool_tracking",
"order": 11
"order": 12
}
},
{
"name": "(compound) grpc_endoscopy_tool_tracking/python (cloud & edge)",
"configurations": [
"(pythoncpp) grpc_endoscopy_tool_tracking/python (cloud)",
"(pythoncpp) grpc_endoscopy_tool_tracking/python (edge)"
],
"preLaunchTask": "Build grpc_endoscopy_tool_tracking",
"stopAll": true,
"presentation": {
"hidden": false,
"group": "grpc_endoscopy_tool_tracking",
"order": 12
}
},
{
Expand All @@ -1322,7 +1397,7 @@
"presentation": {
"hidden": false,
"group": "grpc_h264_endoscopy_tool_tracking",
"order": 11
"order": 13
}
},
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,17 @@ The following launch profiles are available:
- **(gdb) grpc_endoscopy_tool_tracking/cpp (edge)**: Launch the gRPC client.


#### Python

The following launch profiles are available:

- **(compound) grpc_endoscopy_tool_tracking/python (cloud & edge)**: Launch both the gRPC server and the client.
- **(pythoncpp) grpc_endoscopy_tool_tracking/python (cloud)**: Launch the gRPC server.
- **(pythoncpp) grpc_endoscopy_tool_tracking/python (edge)**: Launch the gRPC client.

## Limitations & Known Issues

### C++
- The connection between the server and the client is controlled by `rpc_timeout`. If no data is received or sent within the configured time, it assumes the call has been completed and hangs up. The `rpc_timeout` value can be configured in the [endoscopy_tool_tracking.yaml](./cpp/endoscopy_tool_tracking.yaml) file with a default of 5 seconds. Increasing this value may help on a slow network.
- The server can serve one request at any given time. Any subsequent call receives a `grpc::StatusCode::RESOURCE_EXHAUSTED` status.
- When debugging using the compound profile, the server may not be ready to serve, resulting in errors with the client application. When this happens, open [tasks.json](../../../../.vscode/tasks.json), find `Build grpc_endoscopy_tool_tracking (delay 3s)`, and adjust the `command` field with a higher sleep value.
Expand All @@ -145,9 +154,11 @@ The following launch profiles are available:
[error] [program.cpp:614] Event notification 2 for entity [video_in__outgoing_requests] with id [33] received in an unexpected state [Origin]
```

### Python
- The client may not exit the application correctly if it experiences any errors; use CTRL+C to exit the application if needed.

## Containerize the application

To containerize the application, first install [Holoscan CLI](https://docs.nvidia.com/holoscan/sdk-user-guide/holoscan_packager.html), build the application using `./dev_container build_and_install grpc_endoscopy_tool_tracking`, run the `package-app.sh` script in the [cpp](./cpp/package-app.sh) directory and then follow the generated output to package and run the application.
To containerize the application, first install [Holoscan CLI](https://docs.nvidia.com/holoscan/sdk-user-guide/holoscan_packager.html), build the application using `./dev_container build_and_install grpc_endoscopy_tool_tracking`, run the `package-app.sh` script in the [cpp](./cpp/package-app.sh) directory or the [python](./python/package-app.sh) directory and then follow the generated output to package and run the application.

Refer to the [Packaging Holoscan Applications](https://docs.nvidia.com/holoscan/sdk-user-guide/holoscan_packager.html) section of the [Holoscan User Guide](https://docs.nvidia.com/holoscan/sdk-user-guide/) to learn more about installing the Holoscan CLI or packaging your application using Holoscan CLI.
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,23 @@ cmake_minimum_required(VERSION 3.20)

find_package(holoscan 2.6 REQUIRED CONFIG
PATHS "/opt/nvidia/holoscan" "/workspace/holoscan-sdk/install")

install(
DIRECTORY cloud edge
DESTINATION bin/grpc_endoscopy_tool_tracking/python
)

install(
FILES endoscopy_tool_tracking.yaml
DESTINATION bin/grpc_endoscopy_tool_tracking/python
)

install(
FILES requirements.txt
DESTINATION bin/grpc_endoscopy_tool_tracking/python/cloud
)

install(
FILES requirements.txt
DESTINATION bin/grpc_endoscopy_tool_tracking/python/edge
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging
import os
import signal
Expand All @@ -22,22 +23,21 @@
from queue import Queue

from endoscopy_tool_tracking import EndoscopyToolTrackingPipeline
from operators.grpc_operators.python.server.grpc_service import GrpcService

from operators.grpc_operators.python.server.application_factory import (
ApplicationFactory,
ApplicationInstance,
)
from operators.grpc_operators.python.server.grpc_service import GrpcService

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def signal_handler(sig, frame):
logger.warning(f"Caught signal {sig}. Stopping services...")
async def signal_handler():
logger.warning("Stopping services...")
grpc_service = GrpcService()
grpc_service.stop()
sys.exit(0)
await grpc_service.stop()


def _create_application_instance(args, request_queue: Queue, response_queue: Queue):
Expand All @@ -63,7 +63,7 @@ def parse_arguments():
parser.add_argument(
"-d",
"--data",
default=os.environ.get("HOLOSCAN_INPUT_PATH", default_data_path),
default=os.environ.get("HOLOSCAN_INPUT_PATH", f"{os.getcwd()}/data/endoscopy"),
help=("Set the data path (default: %(default)s)."),
)
parser.add_argument(
Expand All @@ -75,9 +75,10 @@ def parse_arguments():
return parser.parse_args()


if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
default_data_path = f"{os.getcwd()}/data/endoscopy"
async def main():
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(signal_handler()))

args = parse_arguments()

if not os.path.isdir(args.data):
Expand All @@ -95,4 +96,8 @@ def parse_arguments():

grpc_service = GrpcService()
grpc_service.initialize(args.port, application_factory)
grpc_service.start()
await grpc_service.start()


if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,5 @@ def compose(self):
# Lastly, we connect the results from the tool tracking postprocessor to the
# GrpcServerResponseOp so the pipeline can return the results back to the client
self.add_flow(tool_tracking_postprocessor, self.grpc_response_op, {("out", "input")})

self.composed = True
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
# limitations under the License.

import argparse
import asyncio
import logging
import os
import sys
from pathlib import Path

import yaml
from app_edge_single_fragment import AppEdgeSingleFragment
from holoscan.schedulers import MultiThreadScheduler
from holoscan.core import Tracker

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -35,21 +35,7 @@ def parse_arguments():
return args.data, args.config


def parse_config(config_path):
fragment_mode = False
benchmarking = False
try:
with open(config_path, "r") as file:
config = yaml.safe_load(file)
application = config.get("application", {})
fragment_mode = application.get("multifragment", False)
benchmarking = application.get("benchmarking", False)
except Exception as e:
logger.error(f"Error parsing configuration file: {e}")
return fragment_mode, benchmarking


def main():
async def main():
data_directory, config_path = parse_arguments()

if not data_directory:
Expand All @@ -67,41 +53,16 @@ def main():
if not config_path:
config_path = Path(sys.argv[0]).parent.parent / "endoscopy_tool_tracking.yaml"

fragment_mode, benchmarking = parse_config(config_path)

if fragment_mode:
# logger.info("Running application in multi-fragment mode")
# app = AppEdgeMultiFragment(data_directory)
pass
else:
logger.info("Running application in single fragment mode")
app = AppEdgeSingleFragment(data_directory)

if benchmarking:
logger.info("Benchmarking enabled")
trackers = app.track_distributed() if fragment_mode else app.track()

app = AppEdgeSingleFragment(data_directory)
app.config(str(config_path))
app.scheduler(
MultiThreadScheduler(
app,
worker_thread_number=5,
check_recession_period_ms=0.0,
stop_on_deadlock=True,
stop_on_deadlock_timeout=500,
name="multithread_scheduler",
)
)
app.run()

if benchmarking:
if fragment_mode:
for name, tracker in trackers.items():
logger.info(f"Fragment: {name}")
tracker.logger.info()
else:
trackers.logger.info()
with Tracker(app) as trackers:
future = app.run_async()
await app.start_streaming_client()

future.result()
trackers.print()


if __name__ == "__main__":
main()
asyncio.get_event_loop().run_until_complete(main())
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.


import asyncio
import logging

from holoscan.conditions import AsynchronousCondition
Expand All @@ -27,7 +28,7 @@
from operators.grpc_operators.python.common.asynchronous_condition_queue import (
AsynchronousConditionQueue,
)
from operators.grpc_operators.python.common.conditional_variable_queue import ConditionVariableQueue
from operators.grpc_operators.python.common.asyncio_queue import AsyncIoQueue


class AppEdgeSingleFragment(Application):
Expand All @@ -40,7 +41,7 @@ def __init__(self, data_path: str):

def __del__(self):
if self.entity_client_service:
self.entity_client_service.stop_entity_stream()
asyncio.shield(self.entity_client_service.stop_entity_stream())

def compose(self):
width = 854
Expand All @@ -49,7 +50,7 @@ def compose(self):
source_num_blocks = 2

self.condition = AsynchronousCondition(self, name="response_available_condition")
self.request_queue = ConditionVariableQueue(self, name="request_queue")
self.request_queue = AsyncIoQueue(self, name="request_queue")
self.response_queue = AsynchronousConditionQueue(
self, name="response_queue", condition=self.condition
)
Expand Down Expand Up @@ -106,11 +107,15 @@ def compose(self):
self.add_flow(incoming_responses, visualizer_op, {("output", "receivers")})

self.entity_client_service = EntityClientService(
self.from_config("grpc_client.server_address"),
self.from_config("grpc_client.interrupt"),
str(self.from_config("grpc_client.server_address")),
str(self.from_config("grpc_client.interrupt")),
self.request_queue,
self.response_queue,
replayer,
visualizer_op,
)
self.entity_client_service.start_entity_stream()

async def start_streaming_client(self):
while self.entity_client_service is None:
await asyncio.sleep(0.1)

await self.entity_client_service.start_entity_stream()
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ application:
version: 1.0
inputFormats: []
outputFormats: ["screen"]
multifragment: false # default: false, true to run in multi-fragment mode, false otherwise
benchmarking: false # default: false, true to enable Data Flow Benchmarking, false otherwise
grpc_health_check: false # default: false, true to enable gRPC health check, false otherwise

resources:
Expand All @@ -36,7 +34,7 @@ replayer:
basename: "surgical_video"
frame_rate: 0 # as specified in timestamps
repeat: false # default: false
realtime: true # default: true
realtime: true # default: true
count: 0 # default: 0 (no frame count restriction)

format_converter:
Expand Down
Loading

0 comments on commit be84362

Please sign in to comment.