diff --git a/.yardopts b/.yardopts index 4fa836aca..77e7d3bb4 100644 --- a/.yardopts +++ b/.yardopts @@ -1,4 +1,3 @@ ---protected --no-private --embed-mixins --output-dir ./yardoc diff --git a/lib/concurrent/collection/priority_queue.rb b/lib/concurrent/collection/priority_queue.rb index b2266c904..1e49b14b7 100644 --- a/lib/concurrent/collection/priority_queue.rb +++ b/lib/concurrent/collection/priority_queue.rb @@ -21,7 +21,7 @@ module Concurrent # When running under JRuby the class `PriorityQueue` extends `JavaPriorityQueue`. # When running under all other interpreters it extends `MutexPriorityQueue`. # - # @note This implementation is *not* thread safe and performs no blocking. + # @note This implementation is *not* thread safe. # # @see http://en.wikipedia.org/wiki/Priority_queue # @see http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html @@ -305,6 +305,16 @@ def self.from_list(list, opts = {}) # @!macro priority_queue class PriorityQueue < PriorityQueueImplementation + alias_method :has_priority?, :include? + + alias_method :size, :length + + alias_method :deq, :pop + alias_method :shift, :pop + + alias_method :<<, :push + alias_method :enq, :push + # @!method initialize(opts = {}) # @!macro priority_queue_method_initialize @@ -334,6 +344,5 @@ class PriorityQueue < PriorityQueueImplementation # @!method self.from_list(list, opts = {}) # @!macro priority_queue_method_from_list - end end diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index 769a6cd41..60fbe7173 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -16,19 +16,14 @@ module Concurrent # @!macro monotonic_clock_warning class TimerSet < RubyExecutorService - # A class for encapsulating a task and its intended execution time. - # It facilitates proper prioritization by overriding the comparison - # (spaceship) operator as a comparison of the intended execution - # times. - # - # @!visibility private + # An `IVar` representing a tasked queued for execution in a `TimerSet`. class Task < Concurrent::IVar include Comparable - # @!visibility private - def initialize(time, args, task) + def initialize(parent, time, args, task) super() synchronize do + @parent = parent @time = time @args = args @task = task @@ -40,20 +35,20 @@ def time synchronize { @time } end - # @!visibility private def <=>(other) self.time <=> other.time end - # @!visibility private def cancelled? state == :cancelled end - # @!visibility private def cancel if compare_and_set_state(:cancelled, :pending) complete(false, nil, CancelledOperationError.new) + # To avoid deadlocks this call must occur outside of #synchronize + # Changing the state above should prevent redundant calls + @parent.send(:remove_task, self) true else false @@ -64,8 +59,9 @@ def cancel def execute safe_execute(@task, @args) end + + protected :set, :try_set end - private_constant :Task # Create a new set of timed tasks. # @@ -88,7 +84,8 @@ def initialize(opts = {}) # # @yield the task to be performed # - # @return [Boolean] true if the message is post, false after shutdown + # @return [Concurrent::TimerSet::Task, false] IVar representing the task if the post + # is successful; false after shutdown # # @raise [ArgumentError] if the intended execution time is not in the future # @raise [ArgumentError] if no block is given @@ -102,7 +99,7 @@ def post(delay, *args, &task) return false unless running? time = Concurrent.monotonic_time + delay - task = Task.new(time, args, task) + task = Task.new(self, time, args, task) if (delay) <= 0.01 @task_executor.post{ task.execute } @@ -152,6 +149,18 @@ def self.calculate_delay!(delay) protected + # Remove the given task from the queue. + # + # @note This is intended as a callback method from Task only. + # It is not intended to be used directly. Cancel a task by + # using the `Task#cancel` method. + # + # @!visibility private + def remove_task(task) + synchronize{ @queue.delete(task) } + end + + # @!visibility private def ns_initialize(opts) @queue = PriorityQueue.new(order: :min) @task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor @@ -204,9 +213,9 @@ def process_tasks private + # @!visibility private def <<(task) - post(0.0, &task) - self + raise NotImplementedError.new end end end diff --git a/lib/concurrent/utility/timer.rb b/lib/concurrent/utility/timer.rb index ea421e047..80b5accb0 100644 --- a/lib/concurrent/utility/timer.rb +++ b/lib/concurrent/utility/timer.rb @@ -9,7 +9,7 @@ module Concurrent # # @yield the task to execute # - # @return [Boolean] true + # @return [Concurrent::TimerSet::Task] IVar representing the task # # @!macro monotonic_clock_warning def timer(seconds, *args, &block)