diff --git a/lib/syskit/telemetry/async/name_service.rb b/lib/syskit/telemetry/async/name_service.rb index 5d2d03257..388bb8573 100644 --- a/lib/syskit/telemetry/async/name_service.rb +++ b/lib/syskit/telemetry/async/name_service.rb @@ -159,36 +159,63 @@ def wait_for_task_discovery # @api private # - # Remove a resolved task from the pending discoveries + # Find a valid resolved task from the pending discoveries # - # @return [(String,TaskContext),nil] a resolved task or nil - # if there are none so far + # @return [AsyncDiscovery,nil] a valid resolved task or nil if there are + # none so far def pop_discovered_task loop do - async_discovery = @discovery.each_value.find(&:resolved?) - return unless async_discovery - - @discovery.delete(async_discovery.task.name) - async_discovery.update_from_result - - current_ior = @iors.get[async_discovery.task.name] - next unless current_ior - - if async_discovery.ior != current_ior - # The IOR associated with that name changed since the future - # started processing. Throw away the resolved task and start - # again - async_discovery.async_task&.dispose - async_discover_task(async_discovery.task) - next - end - + return unless (async_discovery = pop_finished_discovery) + next unless finished_discovery_validate_ior(async_discovery) next unless async_discovery.async_task return async_discovery end end + # @api private + # + # Get one async discovery result from the terminated discovery futures + # + # Unlike {pop_discovered_task}, it will not try to find a valid discovered + # task. It only gets one finished result + # + # @return [AsyncDiscovery] + def pop_finished_discovery + async_discovery = @discovery.each_value.find(&:resolved?) + return unless async_discovery + + @discovery.delete(async_discovery.task.name) + async_discovery.update_from_result + async_discovery + end + + # @api private + # + # Validate that an async discovery result matches the expected IOR + # for the task + # + # To guard against race conditions, the name service object maintains + # a hash of the task names to the expected IORs. When we fetch an async + # discovery result, we validate that the found task is actually pointing + # to the expected IOR. If it is not, the result is thrown away and a + # new discovery is initiated + # + # @param [AsyncDiscovery] async_discovery + def finished_discovery_validate_ior(async_discovery) + current_ior = @iors.get[async_discovery.task.name] + return unless current_ior + + return true if async_discovery.ior == current_ior + + # The IOR associated with that name changed since the future + # started processing. Throw away the resolved task and start + # again + async_discovery.async_task&.dispose + async_discover_task(async_discovery.task) + false + end + # @api private # # Discover a single task @@ -225,7 +252,9 @@ def orogen_model_from_name(name) # (see NameServiceBase#get) def ior(name) task = @registered_tasks[name] - return task.identity if task.respond_to?(:ior) + if (identity = task&.identity) + return identity + end raise Orocos::NotFound, "task context #{name} cannot be found." end