Skip to content

Commit

Permalink
Replace spin_until_future_complete with spin_until_complete, add spin…
Browse files Browse the repository at this point in the history
…_for method

* Deprecate spin_until_future_complete
* Add spin_until_complete
* Add spin_for method
* Udpdate unit tests

Signed-off-by: Hubert Liberacki <[email protected]>
  • Loading branch information
hliberacki committed Mar 30, 2022
1 parent 8e8c978 commit 5c75743
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 77 deletions.
56 changes: 49 additions & 7 deletions rclpy/rclpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
After a node is created, items of work can be done (e.g. subscription callbacks) by *spinning* on
the node.
The following functions can be used to process work that is waiting to be executed: :func:`spin`,
:func:`spin_once`, and :func:`spin_until_future_complete`.
:func:`spin_once`, and :func:`spin_until_complete`.
When finished with a previously initialized :class:`.Context` (ie. done using
all ROS nodes associated with the context), the :func:`shutdown` function should be called.
Expand Down Expand Up @@ -223,6 +223,51 @@ def spin(node: 'Node', executor: 'Executor' = None) -> None:
finally:
executor.remove_node(node)

def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = None) -> None:
"""
Execute work and block until the context associated with the executor is shutdown
or time duration pass.
Callbacks will be executed by the provided executor.
This function blocks.
:param node: A node to add to the executor to check for work.
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait.
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_once(duration_sec)
finally:
executor.remove_node(node)


def spin_until_complete(
node: 'Node',
condition,
executor: 'Executor' = None,
timeout_sec: float = None
) -> None:
"""
Execute work until the condition is complete.
Callbacks and other work will be executed by the provided executor until ``condition()`` or
``future.done()`` returns ``True`` or the context associated with the executor is shutdown.
:param node: A node to add to the executor to check for work.
:param condition: The callable or future object to wait on.
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait. Block until the condition is complete
if ``None`` or negative. Don't wait if 0.
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_until_complete(condition, timeout_sec)
finally:
executor.remove_node(node)

def spin_until_future_complete(
node: 'Node',
Expand All @@ -241,10 +286,7 @@ def spin_until_future_complete(
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait. Block until the future is complete
if ``None`` or negative. Don't wait if 0.
Deprecated in favor of spin_until_complete
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_until_future_complete(future, timeout_sec)
finally:
executor.remove_node(node)
spin_until_complete(node, future, executor, timeout_sec)
88 changes: 71 additions & 17 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ def __init__(self, *, context: Context = None) -> None:
self._nodes: Set[Node] = set()
self._nodes_lock = RLock()
# Tasks to be executed (oldest first) 3-tuple Task, Entity, Node
self._tasks: List[Tuple[Task, Optional[WaitableEntityType], Optional[Node]]] = []
self._tasks: List[Tuple[Task,
Optional[WaitableEntityType], Optional[Node]]] = []
self._tasks_lock = Lock()
# This is triggered when wait_for_ready_callbacks should rebuild the wait list
self._guard = GuardCondition(
Expand Down Expand Up @@ -278,28 +279,52 @@ def spin(self) -> None:
while self._context.ok() and not self._is_shutdown:
self.spin_once()

def spin_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
"""Execute callbacks until a given future is done or a timeout occurs."""
# Make sure the future wakes this executor when it is done
future.add_done_callback(lambda x: self.wake())
def spin_for(self, duration_sec: float = None) -> None:
"""Execute callbacks until shutdown, or timeout"""
self.spin_until_complete(lambda: False, duration_sec)

def spin_until_complete(self, condition, timeout_sec: float = None) -> None:
"""
Execute callbacks until a given condition is done or a timeout occurs.
Deprecated in favor of spin_until_complete.
"""
# Common conditon for safisfying both Callable and Future
finish_condition = None
if (isinstance(condition, Future)):
# Make sure the future wakes this executor when it is done
condition.add_done_callback(lambda x: self.wake())
finish_condition = lambda: condition.done()
elif (callable(condition)):
finish_condition = lambda: condition()
else:
raise TypeError("Condition has to be of Future or Callable type")

if timeout_sec is None or timeout_sec < 0:
while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_sec)
while self._context.ok() and not finish_condition() and not self._is_shutdown:
self.spin_once_until_complete(condition, timeout_sec)
else:
start = time.monotonic()
end = start + timeout_sec
timeout_left = timeout_sec

while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_left)
while self._context.ok() and not finish_condition() and not self._is_shutdown:
self.spin_once_until_complete(condition, timeout_left)
now = time.monotonic()

if now >= end:
return

timeout_left = end - now

def spin_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
"""
Execute callbacks until a given future is done or a timeout occurs.
Deprecated in favor of spin_until_complete.
"""
self.spin_until_complete(future, timeout_sec)

def spin_once(self, timeout_sec: float = None) -> None:
"""
Wait for and execute a single callback.
Expand All @@ -311,6 +336,19 @@ def spin_once(self, timeout_sec: float = None) -> None:
"""
raise NotImplementedError()

def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None:
"""
Wait for and execute a single callback.
This should behave in the same way as :meth:`spin_once`.
If needed by the implementation, it should awake other threads waiting.
:param condition: The executor will wait until this condition is done.
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
Don't wait if 0.
"""
raise NotImplementedError()

def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
"""
Wait for and execute a single callback.
Expand All @@ -321,6 +359,8 @@ def spin_once_until_future_complete(self, future: Future, timeout_sec: float = N
:param future: The executor will wait until this future is done.
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
Don't wait if 0.
Deprecated in favor of spin_once_until_complete.
"""
raise NotImplementedError()

Expand Down Expand Up @@ -361,7 +401,8 @@ async def _execute_client(self, client, seq_and_response):

def _take_service(self, srv):
with srv.handle:
request_and_header = srv.handle.service_take_request(srv.srv_type.Request)
request_and_header = srv.handle.service_take_request(
srv.srv_type.Request)
return request_and_header

async def _execute_service(self, srv, request_and_header):
Expand Down Expand Up @@ -422,7 +463,8 @@ async def handler(entity, gc, is_shutdown, work_tracker):
# callback group can get executed
gc.trigger()
task = Task(
handler, (entity, self._guard, self._is_shutdown, self._work_tracker),
handler, (entity, self._guard,
self._is_shutdown, self._work_tracker),
executor=self)
with self._tasks_lock:
self._tasks.append((task, entity, node))
Expand Down Expand Up @@ -458,7 +500,8 @@ def _wait_for_ready_callbacks(
timeout_timer = None
timeout_nsec = timeout_sec_to_nsec(timeout_sec)
if timeout_nsec > 0:
timeout_timer = Timer(None, None, timeout_nsec, self._clock, context=self._context)
timeout_timer = Timer(None, None, timeout_nsec,
self._clock, context=self._context)

yielded_work = False
while not yielded_work and not self._is_shutdown and not condition():
Expand All @@ -479,7 +522,8 @@ def _wait_for_ready_callbacks(
yield task, entity, node
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))
self._tasks = list(
filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))

# Gather entities that can be waited on
subscriptions: List[Subscription] = []
Expand All @@ -489,7 +533,8 @@ def _wait_for_ready_callbacks(
services: List[Service] = []
waitables: List[Waitable] = []
for node in nodes_to_use:
subscriptions.extend(filter(self.can_execute, node.subscriptions))
subscriptions.extend(
filter(self.can_execute, node.subscriptions))
timers.extend(filter(self.can_execute, node.timers))
clients.extend(filter(self.can_execute, node.clients))
services.extend(filter(self.can_execute, node.services))
Expand Down Expand Up @@ -701,7 +746,8 @@ def __init__(self, *, context: Context = None) -> None:

def spin_once(self, timeout_sec: float = None) -> None:
try:
handler, entity, node = self.wait_for_ready_callbacks(timeout_sec=timeout_sec)
handler, entity, node = self.wait_for_ready_callbacks(
timeout_sec=timeout_sec)
except ShutdownException:
pass
except TimeoutException:
Expand All @@ -711,9 +757,13 @@ def spin_once(self, timeout_sec: float = None) -> None:
if handler.exception() is not None:
raise handler.exception()

def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None:
self.spin_once(timeout_sec)

"""Deprecated in favor of spin_once_until_complete"""
def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
self.spin_once_until_complete(timeout_sec)


class MultiThreadedExecutor(Executor):
"""
Expand Down Expand Up @@ -756,5 +806,9 @@ def _spin_once_impl(
def spin_once(self, timeout_sec: float = None) -> None:
self._spin_once_impl(timeout_sec)

def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None:
self._spin_once_impl(timeout_sec, condition if callable(condition) else condition.done)

"""Deprecated in favor of spin_once_until_complete"""
def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None:
self._spin_once_impl(timeout_sec, future.done)
self.spin_once_until_complete(timeout_sec, future.done)
26 changes: 13 additions & 13 deletions rclpy/test/test_action_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def test_send_goal_async(self):
try:
self.assertTrue(ac.wait_for_server(timeout_sec=2.0))
future = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)
self.assertTrue(future.done())
goal_handle = future.result()
self.assertTrue(goal_handle.accepted)
Expand All @@ -177,7 +177,7 @@ def test_send_goal_async_with_feedback_after_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Publish feedback after goal has been accepted
self.mock_action_server.publish_feedback(goal_uuid)
Expand All @@ -202,7 +202,7 @@ def test_send_goal_async_with_feedback_before_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Check the feedback was not received
self.assertEqual(self.feedback, None)
Expand All @@ -220,14 +220,14 @@ def test_send_goal_async_with_feedback_for_another_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=first_goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Send another goal, but without a feedback callback
second_goal_uuid = UUID(uuid=list(uuid.uuid4().bytes))
future = ac.send_goal_async(
Fibonacci.Goal(),
goal_uuid=second_goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Publish feedback for the second goal
self.mock_action_server.publish_feedback(second_goal_uuid)
Expand All @@ -251,7 +251,7 @@ def test_send_goal_async_with_feedback_for_not_a_goal(self):
Fibonacci.Goal(),
feedback_callback=self.feedback_callback,
goal_uuid=goal_uuid)
rclpy.spin_until_future_complete(self.node, future, self.executor)
rclpy.spin_until_complete(self.node, future, self.executor)

# Publish feedback for a non-existent goal ID
self.mock_action_server.publish_feedback(UUID(uuid=list(uuid.uuid4().bytes)))
Expand All @@ -272,9 +272,9 @@ def test_send_goal_multiple(self):
future_0 = ac.send_goal_async(Fibonacci.Goal())
future_1 = ac.send_goal_async(Fibonacci.Goal())
future_2 = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, future_0, executor)
rclpy.spin_until_future_complete(self.node, future_1, executor)
rclpy.spin_until_future_complete(self.node, future_2, executor)
rclpy.spin_until_complete(self.node, future_0, executor)
rclpy.spin_until_complete(self.node, future_1, executor)
rclpy.spin_until_complete(self.node, future_2, executor)
self.assertTrue(future_0.done())
self.assertTrue(future_1.done())
self.assertTrue(future_2.done())
Expand All @@ -300,13 +300,13 @@ def test_send_cancel_async(self):

# Send a goal
goal_future = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, goal_future, self.executor)
rclpy.spin_until_complete(self.node, goal_future, self.executor)
self.assertTrue(goal_future.done())
goal_handle = goal_future.result()

# Cancel the goal
cancel_future = goal_handle.cancel_goal_async()
rclpy.spin_until_future_complete(self.node, cancel_future, self.executor)
rclpy.spin_until_complete(self.node, cancel_future, self.executor)
self.assertTrue(cancel_future.done())
self.assertEqual(
cancel_future.result().goals_canceling[0].goal_id,
Expand All @@ -321,13 +321,13 @@ def test_get_result_async(self):

# Send a goal
goal_future = ac.send_goal_async(Fibonacci.Goal())
rclpy.spin_until_future_complete(self.node, goal_future, self.executor)
rclpy.spin_until_complete(self.node, goal_future, self.executor)
self.assertTrue(goal_future.done())
goal_handle = goal_future.result()

# Get the goal result
result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self.node, result_future, self.executor)
rclpy.spin_until_complete(self.node, result_future, self.executor)
self.assertTrue(result_future.done())
finally:
ac.destroy()
Expand Down
Loading

0 comments on commit 5c75743

Please sign in to comment.