Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp Callback Groups #92

Merged
merged 6 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ada_feeding/ada_feeding/behaviors/move_to.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import py_trees
from pymoveit2 import MoveIt2, MoveIt2State
from pymoveit2.robots import kinova
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
from rclpy.qos import QoSPresetProfiles
from sensor_msgs.msg import JointState
Expand Down Expand Up @@ -120,12 +121,16 @@ def __init__(
# Create MoveIt 2 interface for moving the Jaco arm. This must be done
# in __init__ and not setup since the MoveIt2 interface must be
# initialized before the ROS2 node starts spinning.
# Using ReentrantCallbackGroup to align with the examples from pymoveit2.
# TODO: Assess whether ReentrantCallbackGroup is necessary for MoveIt2.
callback_group = ReentrantCallbackGroup()
self.moveit2 = MoveIt2(
node=self.node,
joint_names=kinova.joint_names(),
base_link_name=kinova.base_link_name(),
end_effector_name="forkTip",
group_name=kinova.MOVE_GROUP_ARM,
callback_group=callback_group,
)

# Subscribe to the joint state and track the distance to goal while the
Expand Down
5 changes: 5 additions & 0 deletions ada_feeding/ada_feeding/behaviors/toggle_collision_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import py_trees
from pymoveit2 import MoveIt2
from pymoveit2.robots import kinova
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node

# Local imports
Expand Down Expand Up @@ -55,12 +56,16 @@ def __init__(
# Create MoveIt 2 interface for moving the Jaco arm. This must be done
# in __init__ and not setup since the MoveIt2 interface must be
# initialized before the ROS2 node starts spinning.
# Using ReentrantCallbackGroup to align with the examples from pymoveit2.
# TODO: Assess whether ReentrantCallbackGroup is necessary for MoveIt2.
callback_group = ReentrantCallbackGroup()
self.moveit2 = MoveIt2(
node=self.node,
joint_names=kinova.joint_names(),
base_link_name=kinova.base_link_name(),
end_effector_name="forkTip",
group_name=kinova.MOVE_GROUP_ARM,
callback_group=callback_group,
)

def update(self) -> py_trees.common.Status:
Expand Down
4 changes: 4 additions & 0 deletions ada_feeding/ada_feeding/watchdog/ft_sensor_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,15 @@ def __init__(self, node: Node) -> None:

# Subscribe to the FT sensor topic
self.ft_sensor_topic = self._node.resolve_topic_name("~/ft_topic")
self.ft_sensor_callback_group = (
rclpy.callback_groups.MutuallyExclusiveCallbackGroup()
)
self.ft_sensor_subscription = self._node.create_subscription(
WrenchStamped,
self.ft_sensor_topic,
self.__ft_sensor_callback,
rclpy.qos.QoSPresetProfiles.SENSOR_DATA.value,
callback_group=self.ft_sensor_callback_group,
)

def __ft_sensor_callback(self, msg: WrenchStamped) -> None:
Expand Down
2 changes: 2 additions & 0 deletions ada_feeding/scripts/ada_planning_scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def __init__(self) -> None:
self.load_parameters()

# Initialize the MoveIt2 interface
# Using ReentrantCallbackGroup to align with the examples from pymoveit2.
# TODO: Assess whether ReentrantCallbackGroup is necessary for MoveIt2.
callback_group = ReentrantCallbackGroup()
self.moveit2 = MoveIt2(
node=self,
Expand Down
12 changes: 10 additions & 2 deletions ada_feeding/scripts/ada_watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,16 @@ def __init__(self) -> None:
)

# Publish at the specified rate
# TODO: Consider making this a separate thread that runs at a fixed rate,
# to avoid the callback queue getting backed up if the function takes
# longer than the specified rate to run.
self.timer_callback_group = (
rclpy.callback_groups.MutuallyExclusiveCallbackGroup()
)
timer_period = 1.0 / self.publish_rate_hz.value # seconds
self.timer = self.create_timer(timer_period, self.__check_conditions)
self.timer = self.create_timer(
timer_period, self.__check_conditions, self.timer_callback_group
)

def __load_parameters(self) -> None:
"""
Expand Down Expand Up @@ -199,7 +207,7 @@ def main(args=None):
rclpy.init(args=args)

# Use a MultiThreadedExecutor to enable processing topics concurrently
executor = MultiThreadedExecutor()
executor = MultiThreadedExecutor(num_threads=3)

# Create and spin the node
ada_watchdog = ADAWatchdog()
Expand Down
2 changes: 0 additions & 2 deletions ada_feeding/scripts/create_action_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ def __init__(self) -> None:
"""
super().__init__("create_action_servers")

self._default_callback_group = rclpy.callback_groups.ReentrantCallbackGroup()

# Read the parameters that specify what action servers to create.
action_server_params = self.read_params()

Expand Down
29 changes: 21 additions & 8 deletions ada_feeding_perception/ada_feeding_perception/face_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from geometry_msgs.msg import Point
import numpy as np
import rclpy
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.executors import MultiThreadedExecutor
Expand Down Expand Up @@ -55,8 +56,6 @@ def __init__(
"""
super().__init__("face_detection")

self._default_callback_group = rclpy.callback_groups.ReentrantCallbackGroup()

# Read the parameters
# NOTE: These parameters are only read once. Any changes after the node
# is initialized will not be reflected.
Expand Down Expand Up @@ -108,6 +107,7 @@ def __init__(
SetBool,
"~/toggle_face_detection",
self.toggle_face_detection_callback,
callback_group=MutuallyExclusiveCallbackGroup(),
)

# Create the publishers
Expand All @@ -131,7 +131,11 @@ def __init__(
self.latest_img_msg_lock = threading.Lock()
image_topic = "~/image"
self.img_subscription = self.create_subscription(
get_img_msg_type(image_topic, self), image_topic, self.camera_callback, 1
get_img_msg_type(image_topic, self),
image_topic,
self.camera_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)

depth_buffer_size = depth_buffer_size.value
Expand All @@ -144,6 +148,7 @@ def __init__(
aligned_depth_topic,
self.depth_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)

# Subscribe to the camera info
Expand All @@ -152,7 +157,11 @@ def __init__(
# (updated with subscription)
self.camera_matrix = [614, 0, 312, 0, 614, 223, 0, 0, 1]
self.camera_info_subscription = self.create_subscription(
CameraInfo, "~/camera_info", self.camera_info_callback, 1
CameraInfo,
"~/camera_info",
self.camera_info_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)

def read_params(
Expand Down Expand Up @@ -504,7 +513,9 @@ def run(self) -> None:
with self.latest_img_msg_lock:
rgb_msg = self.latest_img_msg
if rgb_msg is None:
self.get_logger().warn("No RGB image message received.", throttle_duration_sec=1)
self.get_logger().warn(
"No RGB image message received.", throttle_duration_sec=1
)
continue

# Detect the largest face in the RGB image
Expand All @@ -522,8 +533,10 @@ def run(self) -> None:
if is_face_detected_depth:
# Get the 3d location of the mouth
face_detection_msg.detected_mouth_center.header = rgb_msg.header
face_detection_msg.detected_mouth_center.point = self.get_stomion_point(
img_mouth_center[0], img_mouth_center[1], depth_mm
face_detection_msg.detected_mouth_center.point = (
self.get_stomion_point(
img_mouth_center[0], img_mouth_center[1], depth_mm
)
)
else:
is_face_detected = False
Expand All @@ -540,7 +553,7 @@ def main(args=None):
rclpy.init(args=args)

face_detection = FaceDetectionNode()
executor = MultiThreadedExecutor()
executor = MultiThreadedExecutor(num_threads=4)

# Spin in the background since detecting faces will block
# the main thread
Expand Down
4 changes: 3 additions & 1 deletion ada_feeding_perception/ada_feeding_perception/republisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# Third-party imports
from rcl_interfaces.msg import ParameterDescriptor, ParameterType
import rclpy
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node

Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(self) -> None:
topic=self.from_topics[i],
callback=callback,
qos_profile=1, # TODO: we should get and mirror the QOS profile of the from_topic
callback_group=MutuallyExclusiveCallbackGroup(),
)
self.subs.append(subscriber)

Expand Down Expand Up @@ -183,7 +185,7 @@ def main(args=None):
republisher = Republisher()

# Use a MultiThreadedExecutor to enable processing goals concurrently
executor = MultiThreadedExecutor()
executor = MultiThreadedExecutor(num_threads=len(republisher.subs))

rclpy.spin(republisher, executor=executor)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rclpy
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.action.server import ServerGoalHandle
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.parameter import Parameter
Expand Down Expand Up @@ -58,8 +59,6 @@ def __init__(self) -> None:

super().__init__("segment_from_point")

self._default_callback_group = rclpy.callback_groups.ReentrantCallbackGroup()

# Check if cuda is available
self.device = "cuda" if torch.cuda.is_available() else "cpu"

Expand Down Expand Up @@ -91,6 +90,7 @@ def __init__(self) -> None:
"~/camera_info",
self.camera_info_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)
self.camera_info = None
self.camera_info_lock = threading.Lock()
Expand All @@ -106,6 +106,7 @@ def __init__(self) -> None:
aligned_depth_topic,
self.depth_image_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)
self.latest_depth_img_msg = None
self.latest_depth_img_msg_lock = threading.Lock()
Expand All @@ -117,6 +118,7 @@ def __init__(self) -> None:
image_topic,
self.image_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)
self.latest_img_msg = None
self.latest_img_msg_lock = threading.Lock()
Expand Down Expand Up @@ -244,6 +246,7 @@ def initialize_food_segmentation(self, model_name: str, model_path: str) -> None
self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
callback_group=MutuallyExclusiveCallbackGroup(),
)
self.get_logger().info("...Done!")

Expand Down Expand Up @@ -509,7 +512,7 @@ def main(args=None):
segment_from_point = SegmentFromPointNode()

# Use a MultiThreadedExecutor to enable processing goals concurrently
executor = MultiThreadedExecutor()
executor = MultiThreadedExecutor(num_threads=5)

rclpy.spin(segment_from_point, executor=executor)

Expand Down