Skip to content

Commit

Permalink
seperate locks for goal, cancel, and result requests
Browse files Browse the repository at this point in the history
  • Loading branch information
augustelalande committed Sep 17, 2023
1 parent 09881ed commit dc82cf6
Showing 1 changed file with 32 additions and 28 deletions.
60 changes: 32 additions & 28 deletions rclpy/rclpy/action/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ def __init__(
self._node.add_waitable(self)
self._logger = self._node.get_logger().get_child('action_client')

self._lock = threading.Lock()
self._goal_lock = threading.Lock()
self._cancel_lock = threading.Lock()
self._result_lock = threading.Lock()

def _generate_random_uuid(self):
return UUID(uuid=list(uuid.uuid4().bytes))
Expand All @@ -200,34 +202,36 @@ def _remove_pending_request(self, future, pending_requests):
:return: The sequence number associated with the removed future, or
None if the future was not found in the list.
"""
with self._lock:
for seq, req_future in list(pending_requests.items()):
if future == req_future:
try:
del pending_requests[seq]
except KeyError:
pass
else:
self.remove_future(future)
return seq
for seq, req_future in list(pending_requests.items()):
if future == req_future:
try:
del pending_requests[seq]
except KeyError:
pass
else:
self.remove_future(future)
return seq
return None

def _remove_pending_goal_request(self, future):
seq = self._remove_pending_request(future, self._pending_goal_requests)
if seq in self._goal_sequence_number_to_goal_id:
del self._goal_sequence_number_to_goal_id[seq]
with self._goal_lock:
seq = self._remove_pending_request(future, self._pending_goal_requests)
if seq in self._goal_sequence_number_to_goal_id:
del self._goal_sequence_number_to_goal_id[seq]

def _remove_pending_cancel_request(self, future):
self._remove_pending_request(future, self._pending_cancel_requests)
with self._cancel_lock:
self._remove_pending_request(future, self._pending_cancel_requests)

def _remove_pending_result_request(self, future):
seq = self._remove_pending_request(future, self._pending_result_requests)
if seq in self._result_sequence_number_to_goal_id:
goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid)
del self._result_sequence_number_to_goal_id[seq]
# remove feeback_callback if user is aware of result and it's been received
if goal_uuid in self._feedback_callbacks:
del self._feedback_callbacks[goal_uuid]
with self._result_lock:
seq = self._remove_pending_request(future, self._pending_result_requests)
if seq in self._result_sequence_number_to_goal_id:
goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid)
del self._result_sequence_number_to_goal_id[seq]
# remove feeback_callback if user is aware of result and it's been received
if goal_uuid in self._feedback_callbacks:
del self._feedback_callbacks[goal_uuid]

# Start Waitable API
def is_ready(self, wait_set):
Expand Down Expand Up @@ -291,7 +295,7 @@ async def execute(self, taken_data):
sequence_number, goal_response = taken_data['goal']

try:
with self._lock:
with self._goal_lock:
goal_id = self._goal_sequence_number_to_goal_id[sequence_number]
pending_goal_request = self._pending_goal_requests[sequence_number]
except KeyError:
Expand All @@ -313,7 +317,7 @@ async def execute(self, taken_data):
if 'cancel' in taken_data:
sequence_number, cancel_response = taken_data['cancel']
try:
with self._lock:
with self._cancel_lock:
pending_cancel_request = self._pending_cancel_requests[sequence_number]
except KeyError:
self._logger.warning(
Expand All @@ -326,7 +330,7 @@ async def execute(self, taken_data):
if 'result' in taken_data:
sequence_number, result_response = taken_data['result']
try:
with self._lock:
with self._result_lock:
pending_result_request = self._pending_result_requests[sequence_number]
except KeyError:
self._logger.warning(
Expand Down Expand Up @@ -449,7 +453,7 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
request.goal = goal

with self._lock:
with self._goal_lock:
sequence_number = self._client_handle.send_goal_request(request)
if sequence_number in self._pending_goal_requests:
raise RuntimeError(
Expand Down Expand Up @@ -510,7 +514,7 @@ def _cancel_goal_async(self, goal_handle):
cancel_request = CancelGoal.Request()
cancel_request.goal_info.goal_id = goal_handle.goal_id

with self._lock:
with self._cancel_lock:
sequence_number = self._client_handle.send_cancel_request(cancel_request)
if sequence_number in self._pending_cancel_requests:
raise RuntimeError(
Expand Down Expand Up @@ -565,7 +569,7 @@ def _get_result_async(self, goal_handle):
result_request = self._action_type.Impl.GetResultService.Request()
result_request.goal_id = goal_handle.goal_id

with self._lock:
with self._result_lock:
sequence_number = self._client_handle.send_result_request(result_request)
if sequence_number in self._pending_result_requests:
raise RuntimeError(
Expand Down

0 comments on commit dc82cf6

Please sign in to comment.