-
Notifications
You must be signed in to change notification settings - Fork 3
Actionable and serviced APIs
These APIs wrap those in rclpy.client
and rclpy.action.client
to provide a simpler UX when dealing with ROS 2 actions and services.
Actionable and serviced APIs abstract ROS 2 action and service calls behind an interface that resembles that of remote procedure calls. These can be invoked either synchronously or asynchronously. When used asynchronously, serviced APIs return plain futures whereas actionable APIs return action futures. Action futures build on the notion of a future to track actions' feedback, status, and result.
Both abstractions are well integrated with ROS 2 aware scopes and processes.
Note
The following snippets make use of standard ROS 2 examples
and example_interfaces
.
You can use a serviced API as you would use any other callable:
import argparse
from example_interfaces.srv import AddTwoInts
from bdai_ros2_wrappers.service import Serviced, ServiceTimeout, ServiceError
import bdai_ros2_wrappers.process as ros_process
def cli() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("a", type=int)
parser.add_argument("b", type=int)
return parser
@ros_process.main(cli())
def main(args: argparse.Namespace) -> None:
add_two_ints = Serviced(AddTwoInts, "add_two_ints")
if not add_two_ints.wait_for_service(timeout_sec=5.0):
print(f"No {add_two_ints.service_name} services found")
return
try:
print(f"Computing {args.a} + {args.b}...")
result = add_two_ints(AddTwoInts.Request(a=args.a, b=args.b), timeout_sec=5.0)
print("Result is", result.sum)
except ServiceTimeout:
print("Computation timed out")
except ServiceError as e:
print(f"Computation failed: {e}")
if __name__ == "__main__":
main()
Note
You may use servers in the examples_rclpy_minimal_service
package to test this.
Serviced API calls are synchronous by default. This can also be made explicit by calling synchronously()
on them instead e.g. add_two_ints.synchronously()
.
All service outcomes other than nominal success are signaled using exceptions. An optional timeout prevents calling (and blocking on) a service request indefinitely.
You can get a future service response instead of blocking on call too:
import argparse
from example_interfaces.srv import AddTwoInts
from bdai_ros2_wrappers.service import Serviced
from bdai_ros2_wrappers.futures import wait_for_future
import bdai_ros2_wrappers.process as ros_process
def cli() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("a", type=int)
parser.add_argument("b", type=int)
return parser
@ros_process.main(cli())
def main(args: argparse.Namespace) -> None:
add_two_ints = Serviced(AddTwoInts, "add_two_ints")
if not add_two_ints.wait_for_service(timeout_sec=5.0):
print(f"No {add_two_ints.service_name} services found")
return
print(f"Computing {args.a} + {args.b}...")
future = add_two_ints.asynchronously(AddTwoInts.Request(a=args.a, b=args.b))
if not wait_for_future(future, timeout_sec=5.0):
print("Computation did not complete in time")
future.cancel()
return
result = future.result()
print("Result is", result.sum)
if __name__ == "__main__":
main()
Note
You may use servers in the examples_rclpy_minimal_service
package to test this.
Service response must be waited on, either explicitly and with a timeout or implicitly by early result request. Note fetching the future call result may raise.
You can use an actionable API as you would use any other callable:
import argparse
from example_interfaces.action import Fibonacci
from bdai_ros2_wrappers.action import Actionable
from bdai_ros2_wrappers.action import (
ActionTimeout, ActionRejected, ActionCancelled, ActionAborted
)
import bdai_ros2_wrappers.process as ros_process
def cli() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("order", type=int)
return parser
@ros_process.main(cli())
def main(args: argparse.Namespace) -> None:
compute_fibonacci_sequence = Actionable(Fibonacci, "fibonacci")
if not compute_fibonacci_sequence.wait_for_server(timeout_sec=5.0):
print(f"No {compute_fibonacci_sequence.action_name} action server found")
return
try:
print(f"Computing Fibonacci sequence for order N = {args.order}...")
result = compute_fibonacci_sequence(Fibonacci.Goal(order=args.order), timeout_sec=5.0)
print("Sequence is", result.sequence)
except ActionRejected:
print("Computation rejected")
except ActionTimeout:
print("Computation timed out")
except ActionAborted:
print("Computation aborted")
except ActionCancelled:
print("Computation cancelled")
if __name__ == "__main__":
main()
Note
You may use servers in the examples_rclpy_minimal_action_server
package to test this.
Actionable API calls are synchronous by default. This can also be made explicit by calling synchronously()
on them instead e.g. compute_fibonacci_sequence.synchronously()
.
All action outcomes other than nominal success are signaled using exceptions. Action feedback is ignored unless a callback is specified on call.
An optional timeout prevents pursuing (and blocking on) an action indefinitely.
You can get a future to an ongoing action instead of blocking on it:
import argparse
from example_interfaces.action import Fibonacci
from bdai_ros2_wrappers.action import Actionable
from bdai_ros2_wrappers.futures import wait_for_future
import bdai_ros2_wrappers.process as ros_process
def cli() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("order", type=int)
return parser
@ros_process.main(cli())
def main(args: argparse.Namespace) -> None:
compute_fibonacci_sequence = Actionable(Fibonacci, "fibonacci")
if not compute_fibonacci_sequence.wait_for_server(timeout_sec=5.0):
print(f"No {compute_fibonacci_sequence.action_name} action server found")
return
print(f"Computing Fibonacci sequence for order N = {args.order}...")
action = compute_fibonacci_sequence.asynchronously(
Fibonacci.Goal(order=args.order), track_feedback=True
)
wait_for_future(action.acknowledgement, timeout_sec=5.0)
if not action.acknowledged or not action.accepted:
print("Computation rejected")
return
for feedback in action.feedback_stream(timeout_sec=5.0):
print(f"Partial sequence is", feedback.sequence)
if not wait_for_future(action.finalization, timeout_sec=5.0):
print("Computation did not complete in time")
action.cancel()
return
if action.succeeded:
print("Sequence is", action.result.sequence)
elif action.aborted:
print("Computation aborted")
elif action.cancelled:
print("Computation cancelled")
else:
print("Internal server error")
if __name__ == "__main__":
main()
Note
You may use servers in the examples_rclpy_minimal_action_server
package to test this.
Action status must be checked explicitly, and timely before attempting to access an action's result or feedback (which may not be there yet). Action acknowledgement and finalization futures can help synchronization. Action feedback streaming simplifies (soft) real-time action monitoring.
Read up bdai_ros2_wrappers
API documentation for further reference on all the features that actionable and serviced APIs offer.