Skip to content

Commit

Permalink
ScheduledTask and TimerSet::Task now support deref and executor options.
Browse files Browse the repository at this point in the history
  • Loading branch information
jdantonio committed May 18, 2015
1 parent 217c46e commit 9174a1d
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 34 deletions.
1 change: 0 additions & 1 deletion .rspec
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
--require spec_helper
--color
--backtrace
--format documentation
9 changes: 6 additions & 3 deletions lib/concurrent/executor/timer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ class TimerSet < RubyExecutorService
class Task < Concurrent::IVar
include Comparable

attr_reader :executor

def initialize(parent, delay, args, task, opts = {})
super(IVar::NO_VALUE, opts, &nil)
synchronize do
ns_set_delay_and_time!(delay)
@parent = parent
@args = args
@task = task
@executor = Executor.executor_from_options(opts) || Concurrent.global_io_executor
self.observers = CopyOnNotifyObserverSet.new
end
end
Expand Down Expand Up @@ -134,7 +137,7 @@ def initialize(opts = {})
# @!macro deprecated_scheduling_by_clock_time
def post(delay, *args, &task)
raise ArgumentError.new('no block given') unless block_given?
task = Task.new(self, delay, args, task) # may raise exception
task = Task.new(self, delay, args, task, {executor: @task_executor}) # may raise exception
ok = synchronize{ ns_post_task(task) }
ok ? task : false
end
Expand Down Expand Up @@ -192,7 +195,7 @@ def post_task(task)
def ns_post_task(task)
return false unless ns_running?
if (task.original_delay) <= 0.01
@task_executor.post{ task.process_task }
task.executor.post{ task.process_task }
else
@queue.push(task)
# only post the process method when the queue is empty
Expand Down Expand Up @@ -248,7 +251,7 @@ def process_tasks
# queue now must have the same pop time, or a closer one, as
# when we peeked).
task = synchronize { @queue.pop }
@task_executor.post{ task.process_task }
task.executor.post{ task.process_task }
else
@condition.wait([diff, 60].min)
end
Expand Down
6 changes: 4 additions & 2 deletions lib/concurrent/scheduled_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ class ScheduledTask < TimerSet::Task
# but will not be supported in the 1.0 release.
def initialize(delay, opts = {}, &block)
raise ArgumentError.new('no block given') unless block_given?
super(Concurrent.global_timer_set, delay, [], block, &nil)
timer_set = opts.fetch(:timer_set, Concurrent.global_timer_set)
args = get_arguments_from(opts)
super(timer_set, delay, args, block, opts, &nil)
synchronize do
ns_set_state(:unscheduled)
@__original_delay__ = delay
Expand Down Expand Up @@ -182,7 +184,7 @@ def execute
#
# @!macro deprecated_scheduling_by_clock_time
def self.execute(delay, opts = {}, &block)
new(delay, &block).execute
new(delay, opts, &block).execute
end

# In the task execution in progress?
Expand Down
90 changes: 62 additions & 28 deletions spec/concurrent/scheduled_task_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'timecop'
require_relative 'dereferenceable_shared'
require_relative 'obligation_shared'
require_relative 'observable_shared'

Expand All @@ -8,8 +9,6 @@ module Concurrent

context 'behavior' do

# obligation

let!(:fulfilled_value) { 10 }
let!(:rejected_reason) { StandardError.new('mojo jojo') }

Expand All @@ -33,21 +32,30 @@ module Concurrent
task
end

it_should_behave_like :obligation

# dereferenceable

specify{ expect(ScheduledTask.ancestors).to include(Dereferenceable) }
def dereferenceable_subject(value, opts = {})
task = ScheduledTask.execute(0, opts){ value }.execute
task.value
task
end

# observable
def dereferenceable_observable(opts = {})
ScheduledTask.new(0, opts){ 'value' }
end

subject{ ScheduledTask.new(0.1){ nil } }
def execute_dereferenceable(subject)
subject.execute
subject.value
end

def trigger_observable(observable)
observable.execute
sleep(0.2)
end

subject{ ScheduledTask.new(0.1){ nil } }

it_should_behave_like :obligation
it_should_behave_like :dereferenceable
it_should_behave_like :observable
end

Expand Down Expand Up @@ -107,10 +115,6 @@ def trigger_observable(observable)
task.execute
end

it 'allows setting the execution interval to 0' do
expect { 1000.times { ScheduledTask.execute(0) { } } }.not_to raise_error
end

it 'sets the sate to :pending' do
task = ScheduledTask.new(1){ nil }
task.execute
Expand Down Expand Up @@ -145,6 +149,50 @@ def trigger_observable(observable)
end
end

context 'execution' do

it 'passes :args from the options to the block' do
expected = [1, 2, 3]
actual = nil
latch = Concurrent::CountDownLatch.new
task = ScheduledTask.execute(0, args: expected) do |*args|
actual = args
latch.count_down
end
latch.wait(2)
expect(actual).to eq expected
end

it 'uses the :executor from the options' do
latch = Concurrent::CountDownLatch.new
executor = Concurrent::ImmediateExecutor.new
expect(executor).to receive(:post).once.with(any_args).and_call_original
task = ScheduledTask.execute(0.1, executor: executor) do
latch.count_down
end
latch.wait(2)
end

it 'uses the :timer_set from the options' do
timer = Concurrent::TimerSet.new
expect(timer).to receive(:post_task).once.with(any_args).and_return(false)
task = ScheduledTask.execute(1, timer_set: timer){ nil }
end

it 'sets the state to :processing when the task is running' do
start_latch = Concurrent::CountDownLatch.new(1)
continue_latch = Concurrent::CountDownLatch.new(1)
task = ScheduledTask.new(0.1) {
start_latch.count_down
continue_latch.wait(2)
}.execute
start_latch.wait(2)
state = task.state
continue_latch.count_down
expect(state).to eq :processing
end
end

context '#cancel' do

it 'returns false if the task has already been performed' do
Expand All @@ -171,7 +219,6 @@ def trigger_observable(observable)
expect(latch.wait(0.3)).to be_falsey
end


it 'cancels the task if it has not yet started' do
latch = Concurrent::CountDownLatch.new(1)
task = ScheduledTask.new(0.3){ latch.count_down }.execute
Expand All @@ -195,19 +242,6 @@ def trigger_observable(observable)
end
end

context 'execution' do

it 'sets the state to :in_progress when the task is running' do
latch = Concurrent::CountDownLatch.new(1)
task = ScheduledTask.new(0.1) {
latch.count_down
sleep(1)
}.execute
latch.wait(1)
expect(task).to be_in_progress
end
end

context 'observation' do

let(:clazz) do
Expand Down Expand Up @@ -240,7 +274,7 @@ def update(time, value, reason)
expect(task.add_observer(observer)).to be_truthy
end

it 'returns true for an observer added while :in_progress' do
it 'returns true for an observer added while :processing' do
task = ScheduledTask.new(0.1){ sleep(1); 42 }.execute
sleep(0.2)
expect(task.add_observer(observer)).to be_truthy
Expand Down

0 comments on commit 9174a1d

Please sign in to comment.