Skip to content

Commit

Permalink
Improvements to TimerSet#cancel.
Browse files Browse the repository at this point in the history
  • Loading branch information
jdantonio committed May 18, 2015
1 parent 9498bfc commit 988ed27
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 20 deletions.
1 change: 0 additions & 1 deletion .yardopts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
--protected
--no-private
--embed-mixins
--output-dir ./yardoc
Expand Down
13 changes: 11 additions & 2 deletions lib/concurrent/collection/priority_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module Concurrent
# When running under JRuby the class `PriorityQueue` extends `JavaPriorityQueue`.
# When running under all other interpreters it extends `MutexPriorityQueue`.
#
# @note This implementation is *not* thread safe and performs no blocking.
# @note This implementation is *not* thread safe.
#
# @see http://en.wikipedia.org/wiki/Priority_queue
# @see http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html
Expand Down Expand Up @@ -305,6 +305,16 @@ def self.from_list(list, opts = {})
# @!macro priority_queue
class PriorityQueue < PriorityQueueImplementation

alias_method :has_priority?, :include?

alias_method :size, :length

alias_method :deq, :pop
alias_method :shift, :pop

alias_method :<<, :push
alias_method :enq, :push

# @!method initialize(opts = {})
# @!macro priority_queue_method_initialize

Expand Down Expand Up @@ -334,6 +344,5 @@ class PriorityQueue < PriorityQueueImplementation

# @!method self.from_list(list, opts = {})
# @!macro priority_queue_method_from_list

end
end
41 changes: 25 additions & 16 deletions lib/concurrent/executor/timer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@ module Concurrent
# @!macro monotonic_clock_warning
class TimerSet < RubyExecutorService

# A class for encapsulating a task and its intended execution time.
# It facilitates proper prioritization by overriding the comparison
# (spaceship) operator as a comparison of the intended execution
# times.
#
# @!visibility private
# An `IVar` representing a tasked queued for execution in a `TimerSet`.
class Task < Concurrent::IVar
include Comparable

# @!visibility private
def initialize(time, args, task)
def initialize(parent, time, args, task)
super()
synchronize do
@parent = parent
@time = time
@args = args
@task = task
Expand All @@ -40,20 +35,20 @@ def time
synchronize { @time }
end

# @!visibility private
def <=>(other)
self.time <=> other.time
end

# @!visibility private
def cancelled?
state == :cancelled
end

# @!visibility private
def cancel
if compare_and_set_state(:cancelled, :pending)
complete(false, nil, CancelledOperationError.new)
# To avoid deadlocks this call must occur outside of #synchronize
# Changing the state above should prevent redundant calls
@parent.send(:remove_task, self)
true
else
false
Expand All @@ -64,8 +59,9 @@ def cancel
def execute
safe_execute(@task, @args)
end

protected :set, :try_set
end
private_constant :Task

# Create a new set of timed tasks.
#
Expand All @@ -88,7 +84,8 @@ def initialize(opts = {})
#
# @yield the task to be performed
#
# @return [Boolean] true if the message is post, false after shutdown
# @return [Concurrent::TimerSet::Task, false] IVar representing the task if the post
# is successful; false after shutdown
#
# @raise [ArgumentError] if the intended execution time is not in the future
# @raise [ArgumentError] if no block is given
Expand All @@ -102,7 +99,7 @@ def post(delay, *args, &task)
return false unless running?

time = Concurrent.monotonic_time + delay
task = Task.new(time, args, task)
task = Task.new(self, time, args, task)

if (delay) <= 0.01
@task_executor.post{ task.execute }
Expand Down Expand Up @@ -152,6 +149,18 @@ def self.calculate_delay!(delay)

protected

# Remove the given task from the queue.
#
# @note This is intended as a callback method from Task only.
# It is not intended to be used directly. Cancel a task by
# using the `Task#cancel` method.
#
# @!visibility private
def remove_task(task)
synchronize{ @queue.delete(task) }
end

# @!visibility private
def ns_initialize(opts)
@queue = PriorityQueue.new(order: :min)
@task_executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
Expand Down Expand Up @@ -204,9 +213,9 @@ def process_tasks

private

# @!visibility private
def <<(task)
post(0.0, &task)
self
raise NotImplementedError.new
end
end
end
2 changes: 1 addition & 1 deletion lib/concurrent/utility/timer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Concurrent
#
# @yield the task to execute
#
# @return [Boolean] true
# @return [Concurrent::TimerSet::Task] IVar representing the task
#
# @!macro monotonic_clock_warning
def timer(seconds, *args, &block)
Expand Down

0 comments on commit 988ed27

Please sign in to comment.