From 0b6eea6774d20746c42d6b5d30bfed7724f3a7ab Mon Sep 17 00:00:00 2001 From: augustelalande Date: Sat, 16 Sep 2023 22:37:53 -0400 Subject: [PATCH 1/4] prevent deadlock on synchronous calls Signed-off-by: augustelalande --- rclpy/rclpy/action/client.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rclpy/rclpy/action/client.py b/rclpy/rclpy/action/client.py index bdde81f9f..947070c6e 100644 --- a/rclpy/rclpy/action/client.py +++ b/rclpy/rclpy/action/client.py @@ -400,7 +400,8 @@ def unblock(future): send_goal_future = self.send_goal_async(goal, **kwargs) send_goal_future.add_done_callback(unblock) - event.wait() + if not send_goal_future.done(): + event.wait() if send_goal_future.exception() is not None: raise send_goal_future.exception() @@ -475,7 +476,8 @@ def unblock(future): future = self._cancel_goal_async(goal_handle) future.add_done_callback(unblock) - event.wait() + if not future.done(): + event.wait() if future.exception() is not None: raise future.exception() return future.result() @@ -527,7 +529,8 @@ def unblock(future): future = self._get_result_async(goal_handle) future.add_done_callback(unblock) - event.wait() + if not future.done(): + event.wait() if future.exception() is not None: raise future.exception() return future.result() From b270cdb783b11360b02c3536a4f69e8f6568f11c Mon Sep 17 00:00:00 2001 From: augustelalande Date: Sat, 16 Sep 2023 23:44:16 -0400 Subject: [PATCH 2/4] lock access to prending_requests Signed-off-by: augustelalande --- rclpy/rclpy/action/client.py | 168 +++++++++++++++++++---------------- 1 file changed, 90 insertions(+), 78 deletions(-) diff --git a/rclpy/rclpy/action/client.py b/rclpy/rclpy/action/client.py index 947070c6e..df6836d67 100644 --- a/rclpy/rclpy/action/client.py +++ b/rclpy/rclpy/action/client.py @@ -182,6 +182,8 @@ def __init__( self._node.add_waitable(self) self._logger = self._node.get_logger().get_child('action_client') + self._lock = threading.Lock() + def _generate_random_uuid(self): return UUID(uuid=list(uuid.uuid4().bytes)) @@ -198,15 +200,16 @@ def _remove_pending_request(self, future, pending_requests): :return: The sequence number associated with the removed future, or None if the future was not found in the list. """ - for seq, req_future in list(pending_requests.items()): - if future == req_future: - try: - del pending_requests[seq] - except KeyError: - pass - else: - self.remove_future(future) - return seq + with self._lock: + for seq, req_future in list(pending_requests.items()): + if future == req_future: + try: + del pending_requests[seq] + except KeyError: + pass + else: + self.remove_future(future) + return seq return None def _remove_pending_goal_request(self, future): @@ -286,45 +289,48 @@ async def execute(self, taken_data): """ if 'goal' in taken_data: sequence_number, goal_response = taken_data['goal'] - if sequence_number in self._goal_sequence_number_to_goal_id: - goal_handle = ClientGoalHandle( - self, - self._goal_sequence_number_to_goal_id[sequence_number], - goal_response) - - if goal_handle.accepted: - goal_uuid = bytes(goal_handle.goal_id.uuid) - if goal_uuid in self._goal_handles: - raise RuntimeError( - 'Two goals were accepted with the same ID ({})'.format(goal_handle)) - self._goal_handles[goal_uuid] = weakref.ref(goal_handle) - - self._pending_goal_requests[sequence_number].set_result(goal_handle) - else: - self._logger.warning( - 'Ignoring unexpected goal response. There may be more than ' - f"one action server for the action '{self._action_name}'" - ) + with self._lock: + if sequence_number in self._goal_sequence_number_to_goal_id: + goal_handle = ClientGoalHandle( + self, + self._goal_sequence_number_to_goal_id[sequence_number], + goal_response) + + if goal_handle.accepted: + goal_uuid = bytes(goal_handle.goal_id.uuid) + if goal_uuid in self._goal_handles: + raise RuntimeError( + f'Two goals were accepted with the same ID ({goal_handle})') + self._goal_handles[goal_uuid] = weakref.ref(goal_handle) + + self._pending_goal_requests[sequence_number].set_result(goal_handle) + else: + self._logger.warning( + 'Ignoring unexpected goal response. There may be more than ' + f"one action server for the action '{self._action_name}'" + ) if 'cancel' in taken_data: sequence_number, cancel_response = taken_data['cancel'] - if sequence_number in self._pending_cancel_requests: - self._pending_cancel_requests[sequence_number].set_result(cancel_response) - else: - self._logger.warning( - 'Ignoring unexpected cancel response. There may be more than ' - f"one action server for the action '{self._action_name}'" - ) + with self._lock: + if sequence_number in self._pending_cancel_requests: + self._pending_cancel_requests[sequence_number].set_result(cancel_response) + else: + self._logger.warning( + 'Ignoring unexpected cancel response. There may be more than ' + f"one action server for the action '{self._action_name}'" + ) if 'result' in taken_data: sequence_number, result_response = taken_data['result'] - if sequence_number in self._pending_result_requests: - self._pending_result_requests[sequence_number].set_result(result_response) - else: - self._logger.warning( - 'Ignoring unexpected result response. There may be more than ' - f"one action server for the action '{self._action_name}'" - ) + with self._lock: + if sequence_number in self._pending_result_requests: + self._pending_result_requests[sequence_number].set_result(result_response) + else: + self._logger.warning( + 'Ignoring unexpected result response. There may be more than ' + f"one action server for the action '{self._action_name}'" + ) if 'feedback' in taken_data: feedback_msg = taken_data['feedback'] @@ -438,22 +444,24 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None): request = self._action_type.Impl.SendGoalService.Request() request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid request.goal = goal - sequence_number = self._client_handle.send_goal_request(request) - if sequence_number in self._pending_goal_requests: - raise RuntimeError( - 'Sequence ({}) conflicts with pending goal request'.format(sequence_number)) - - if feedback_callback is not None: - # TODO(jacobperron): Move conversion function to a general-use package - goal_uuid = bytes(request.goal_id.uuid) - self._feedback_callbacks[goal_uuid] = feedback_callback - - future = Future() - self._pending_goal_requests[sequence_number] = future - self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id - future.add_done_callback(self._remove_pending_goal_request) - # Add future so executor is aware - self.add_future(future) + + with self._lock: + sequence_number = self._client_handle.send_goal_request(request) + if sequence_number in self._pending_goal_requests: + raise RuntimeError( + 'Sequence ({}) conflicts with pending goal request'.format(sequence_number)) + + if feedback_callback is not None: + # TODO(jacobperron): Move conversion function to a general-use package + goal_uuid = bytes(request.goal_id.uuid) + self._feedback_callbacks[goal_uuid] = feedback_callback + + future = Future() + self._pending_goal_requests[sequence_number] = future + self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id + future.add_done_callback(self._remove_pending_goal_request) + # Add future so executor is aware + self.add_future(future) return future @@ -497,16 +505,18 @@ def _cancel_goal_async(self, goal_handle): cancel_request = CancelGoal.Request() cancel_request.goal_info.goal_id = goal_handle.goal_id - sequence_number = self._client_handle.send_cancel_request(cancel_request) - if sequence_number in self._pending_cancel_requests: - raise RuntimeError( - 'Sequence ({}) conflicts with pending cancel request'.format(sequence_number)) - future = Future() - self._pending_cancel_requests[sequence_number] = future - future.add_done_callback(self._remove_pending_cancel_request) - # Add future so executor is aware - self.add_future(future) + with self._lock: + sequence_number = self._client_handle.send_cancel_request(cancel_request) + if sequence_number in self._pending_cancel_requests: + raise RuntimeError( + 'Sequence ({}) conflicts with pending cancel request'.format(sequence_number)) + + future = Future() + self._pending_cancel_requests[sequence_number] = future + future.add_done_callback(self._remove_pending_cancel_request) + # Add future so executor is aware + self.add_future(future) return future @@ -550,17 +560,19 @@ def _get_result_async(self, goal_handle): result_request = self._action_type.Impl.GetResultService.Request() result_request.goal_id = goal_handle.goal_id - sequence_number = self._client_handle.send_result_request(result_request) - if sequence_number in self._pending_result_requests: - raise RuntimeError( - 'Sequence ({}) conflicts with pending result request'.format(sequence_number)) - - future = Future() - self._pending_result_requests[sequence_number] = future - self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id - future.add_done_callback(self._remove_pending_result_request) - # Add future so executor is aware - self.add_future(future) + + with self._lock: + sequence_number = self._client_handle.send_result_request(result_request) + if sequence_number in self._pending_result_requests: + raise RuntimeError( + 'Sequence ({}) conflicts with pending result request'.format(sequence_number)) + + future = Future() + self._pending_result_requests[sequence_number] = future + self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id + future.add_done_callback(self._remove_pending_result_request) + # Add future so executor is aware + self.add_future(future) return future From 9d49bf721df2bde35a7fed085b6375cdb4efa900 Mon Sep 17 00:00:00 2001 From: augustelalande Date: Sat, 16 Sep 2023 23:59:42 -0400 Subject: [PATCH 3/4] minimize lock time Signed-off-by: augustelalande --- rclpy/rclpy/action/client.py | 76 +++++++++++++++++++----------------- 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/rclpy/rclpy/action/client.py b/rclpy/rclpy/action/client.py index df6836d67..86ecdc659 100644 --- a/rclpy/rclpy/action/client.py +++ b/rclpy/rclpy/action/client.py @@ -289,48 +289,52 @@ async def execute(self, taken_data): """ if 'goal' in taken_data: sequence_number, goal_response = taken_data['goal'] - with self._lock: - if sequence_number in self._goal_sequence_number_to_goal_id: - goal_handle = ClientGoalHandle( - self, - self._goal_sequence_number_to_goal_id[sequence_number], - goal_response) - - if goal_handle.accepted: - goal_uuid = bytes(goal_handle.goal_id.uuid) - if goal_uuid in self._goal_handles: - raise RuntimeError( - f'Two goals were accepted with the same ID ({goal_handle})') - self._goal_handles[goal_uuid] = weakref.ref(goal_handle) - - self._pending_goal_requests[sequence_number].set_result(goal_handle) - else: - self._logger.warning( - 'Ignoring unexpected goal response. There may be more than ' - f"one action server for the action '{self._action_name}'" - ) + + try: + with self._lock: + goal_id = self._goal_sequence_number_to_goal_id[sequence_number] + pending_goal_request = self._pending_goal_requests[sequence_number] + except KeyError: + self._logger.warning( + 'Ignoring unexpected goal response. There may be more than ' + f"one action server for the action '{self._action_name}'" + ) + else: + goal_handle = ClientGoalHandle(self, goal_id, goal_response) + if goal_handle.accepted: + goal_uuid = bytes(goal_handle.goal_id.uuid) + if goal_uuid in self._goal_handles: + raise RuntimeError( + f'Two goals were accepted with the same ID ({goal_handle})') + self._goal_handles[goal_uuid] = weakref.ref(goal_handle) + + pending_goal_request.set_result(goal_handle) if 'cancel' in taken_data: sequence_number, cancel_response = taken_data['cancel'] - with self._lock: - if sequence_number in self._pending_cancel_requests: - self._pending_cancel_requests[sequence_number].set_result(cancel_response) - else: - self._logger.warning( - 'Ignoring unexpected cancel response. There may be more than ' - f"one action server for the action '{self._action_name}'" - ) + try: + with self._lock: + pending_cancel_request = self._pending_cancel_requests[sequence_number] + except KeyError: + self._logger.warning( + 'Ignoring unexpected cancel response. There may be more than ' + f"one action server for the action '{self._action_name}'" + ) + else: + pending_cancel_request.set_result(cancel_response) if 'result' in taken_data: sequence_number, result_response = taken_data['result'] - with self._lock: - if sequence_number in self._pending_result_requests: - self._pending_result_requests[sequence_number].set_result(result_response) - else: - self._logger.warning( - 'Ignoring unexpected result response. There may be more than ' - f"one action server for the action '{self._action_name}'" - ) + try: + with self._lock: + pending_result_request = self._pending_result_requests[sequence_number] + except KeyError: + self._logger.warning( + 'Ignoring unexpected result response. There may be more than ' + f"one action server for the action '{self._action_name}'" + ) + else: + pending_result_request.set_result(result_response) if 'feedback' in taken_data: feedback_msg = taken_data['feedback'] From 4247cb5e76458ede99ba227c9cc60ec6e4b8ccbd Mon Sep 17 00:00:00 2001 From: augustelalande Date: Sun, 17 Sep 2023 00:04:37 -0400 Subject: [PATCH 4/4] seperate locks for goal, cancel, and result requests Signed-off-by: augustelalande --- rclpy/rclpy/action/client.py | 60 +++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/rclpy/rclpy/action/client.py b/rclpy/rclpy/action/client.py index 86ecdc659..43cdff394 100644 --- a/rclpy/rclpy/action/client.py +++ b/rclpy/rclpy/action/client.py @@ -182,7 +182,9 @@ def __init__( self._node.add_waitable(self) self._logger = self._node.get_logger().get_child('action_client') - self._lock = threading.Lock() + self._goal_lock = threading.Lock() + self._cancel_lock = threading.Lock() + self._result_lock = threading.Lock() def _generate_random_uuid(self): return UUID(uuid=list(uuid.uuid4().bytes)) @@ -200,34 +202,36 @@ def _remove_pending_request(self, future, pending_requests): :return: The sequence number associated with the removed future, or None if the future was not found in the list. """ - with self._lock: - for seq, req_future in list(pending_requests.items()): - if future == req_future: - try: - del pending_requests[seq] - except KeyError: - pass - else: - self.remove_future(future) - return seq + for seq, req_future in list(pending_requests.items()): + if future == req_future: + try: + del pending_requests[seq] + except KeyError: + pass + else: + self.remove_future(future) + return seq return None def _remove_pending_goal_request(self, future): - seq = self._remove_pending_request(future, self._pending_goal_requests) - if seq in self._goal_sequence_number_to_goal_id: - del self._goal_sequence_number_to_goal_id[seq] + with self._goal_lock: + seq = self._remove_pending_request(future, self._pending_goal_requests) + if seq in self._goal_sequence_number_to_goal_id: + del self._goal_sequence_number_to_goal_id[seq] def _remove_pending_cancel_request(self, future): - self._remove_pending_request(future, self._pending_cancel_requests) + with self._cancel_lock: + self._remove_pending_request(future, self._pending_cancel_requests) def _remove_pending_result_request(self, future): - seq = self._remove_pending_request(future, self._pending_result_requests) - if seq in self._result_sequence_number_to_goal_id: - goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid) - del self._result_sequence_number_to_goal_id[seq] - # remove feeback_callback if user is aware of result and it's been received - if goal_uuid in self._feedback_callbacks: - del self._feedback_callbacks[goal_uuid] + with self._result_lock: + seq = self._remove_pending_request(future, self._pending_result_requests) + if seq in self._result_sequence_number_to_goal_id: + goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid) + del self._result_sequence_number_to_goal_id[seq] + # remove feeback_callback if user is aware of result and it's been received + if goal_uuid in self._feedback_callbacks: + del self._feedback_callbacks[goal_uuid] # Start Waitable API def is_ready(self, wait_set): @@ -291,7 +295,7 @@ async def execute(self, taken_data): sequence_number, goal_response = taken_data['goal'] try: - with self._lock: + with self._goal_lock: goal_id = self._goal_sequence_number_to_goal_id[sequence_number] pending_goal_request = self._pending_goal_requests[sequence_number] except KeyError: @@ -313,7 +317,7 @@ async def execute(self, taken_data): if 'cancel' in taken_data: sequence_number, cancel_response = taken_data['cancel'] try: - with self._lock: + with self._cancel_lock: pending_cancel_request = self._pending_cancel_requests[sequence_number] except KeyError: self._logger.warning( @@ -326,7 +330,7 @@ async def execute(self, taken_data): if 'result' in taken_data: sequence_number, result_response = taken_data['result'] try: - with self._lock: + with self._result_lock: pending_result_request = self._pending_result_requests[sequence_number] except KeyError: self._logger.warning( @@ -449,7 +453,7 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None): request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid request.goal = goal - with self._lock: + with self._goal_lock: sequence_number = self._client_handle.send_goal_request(request) if sequence_number in self._pending_goal_requests: raise RuntimeError( @@ -510,7 +514,7 @@ def _cancel_goal_async(self, goal_handle): cancel_request = CancelGoal.Request() cancel_request.goal_info.goal_id = goal_handle.goal_id - with self._lock: + with self._cancel_lock: sequence_number = self._client_handle.send_cancel_request(cancel_request) if sequence_number in self._pending_cancel_requests: raise RuntimeError( @@ -565,7 +569,7 @@ def _get_result_async(self, goal_handle): result_request = self._action_type.Impl.GetResultService.Request() result_request.goal_id = goal_handle.goal_id - with self._lock: + with self._result_lock: sequence_number = self._client_handle.send_result_request(result_request) if sequence_number in self._pending_result_requests: raise RuntimeError(