diff --git a/README.md b/README.md index 87f013d1..49b4fd59 100644 --- a/README.md +++ b/README.md @@ -5,14 +5,15 @@ -Mosquito is a generic background job runner written primarily for Crystal. Significant inspiration from experience with the successes and failings many Ruby gems in this vein. +Mosquito is a generic background job runner written primarily for Crystal. Significant inspiration from experience with the successes and failings many Ruby gems in this vein. Once compiled, a mosquito binary can start work in about 10 milliseconds. Mosquito currently provides these features: -- Delayed execution -- Scheduled / Periodic execution + +- Delayed execution (`SendEmailJob.new(email: :welcome, address: user.email).enqueue(in: 3.minutes)`) +- Scheduled / Periodic execution (`RunEveryHourJob.new`) - Job Storage in Redis - Automatic rescheduling of failed jobs -- Progressively increasing delay of failed jobs +- Progressively increasing delay of rescheduled failed jobs - Dead letter queue of jobs which have failed too many times - Rate limited jobs diff --git a/demo/run.cr b/demo/run.cr index 0fb5d77b..f95cbd19 100644 --- a/demo/run.cr +++ b/demo/run.cr @@ -2,10 +2,19 @@ require "../src/mosquito" Mosquito.configure do |settings| settings.redis_url = ENV["REDIS_URL"]? || "redis://localhost:6379/3" + settings.idle_wait = 1.second end Mosquito.configuration.backend.flush +Log.setup do |c| + backend = Log::IOBackend.new + + c.bind "redis.*", :warn, backend + c.bind "mosquito.*", :debug, backend + c.bind "mosquito.runners.overseer", :trace, backend +end + require "./jobs/*" def expect_run_count(klass, expected) @@ -17,8 +26,15 @@ def expect_run_count(klass, expected) end end +stopping = false Signal::INT.trap do + if stopping + puts "SIGINT received again, crash-exiting." + exit 1 + end + Mosquito::Runner.stop + stopping = true end Mosquito::Runner.start(spin: false) @@ -29,7 +45,7 @@ while count <= 20 && Mosquito::Runner.keep_running count += 1 end -Mosquito::Runner.stop +Mosquito::Runner.stop(wait: true) puts "End of demo." puts "----------------------------------" diff --git a/src/mosquito/configuration.cr b/src/mosquito/configuration.cr index 7b4196f2..ae596a93 100644 --- a/src/mosquito/configuration.cr +++ b/src/mosquito/configuration.cr @@ -13,8 +13,8 @@ module Mosquito property failed_job_ttl : Int32 = 86400 @[Deprecated("cron scheduling can now handled automatically. See https://github.com/mosquito-cr/mosquito/pull/108")] - property run_cron_scheduler : Bool = true - property use_distributed_lock : Bool = false + property run_cron_scheduler : Bool = false + property use_distributed_lock : Bool = true property run_from : Array(String) = [] of String property backend : Mosquito::Backend.class = Mosquito::RedisBackend diff --git a/src/mosquito/periodic_job_run.cr b/src/mosquito/periodic_job_run.cr index f56b422e..f758ca2c 100644 --- a/src/mosquito/periodic_job_run.cr +++ b/src/mosquito/periodic_job_run.cr @@ -49,6 +49,9 @@ module Mosquito if last_executed_at + interval <= now execute + # Weaknesses: + # - If something interferes with the job run, it won't be correct that it was executed. + # - if the worker is backlogged, the start time will be different from the last executed time. self.last_executed_at = now true else diff --git a/src/mosquito/queue.cr b/src/mosquito/queue.cr index 2eb7d6d5..5ebf9ddc 100644 --- a/src/mosquito/queue.cr +++ b/src/mosquito/queue.cr @@ -79,6 +79,8 @@ module Mosquito getter? empty : Bool property backend : Mosquito::Backend + Log = ::Log.for self + def initialize(@name : String) @empty = false @backend = Mosquito.backend.named name @@ -86,6 +88,7 @@ module Mosquito end def enqueue(job_run : JobRun) : JobRun + Log.trace { "Enqueuing #{job_run} for immediate execution" } backend.enqueue job_run end @@ -94,6 +97,7 @@ module Mosquito end def enqueue(job_run : JobRun, at execute_time : Time) : JobRun + Log.trace { "Enqueuing #{job_run} at #{execute_time}" } backend.schedule job_run, execute_time end diff --git a/src/mosquito/runner.cr b/src/mosquito/runner.cr index 2e78fc07..5735747e 100644 --- a/src/mosquito/runner.cr +++ b/src/mosquito/runner.cr @@ -4,47 +4,46 @@ module Mosquito class Runner Log = ::Log.for self - # Should mosquito continue working? - class_property keep_running : Bool = true - # Start the mosquito runner. # # If spin = true (default) the function will not return until the runner is # shut down. Otherwise it will return immediately. # - def self.start(spin = true) Log.notice { "Mosquito is buzzing..." } + def self.start(spin = true) + Log.notice { "Mosquito is buzzing..." } instance.run - while spin && @@keep_running - sleep 1 + while spin && keep_running + Fiber.yield end end + def self.keep_running : Bool + instance.state.starting? || instance.state.running? + end + # Request the mosquito runner stop. The runner will not abort the current job # but it will not start any new jobs. - def self.stop + def self.stop(wait = false) Log.notice { "Mosquito is shutting down..." } - self.keep_running = false - instance.stop + finished_notifier = instance.stop + + if wait + finished_notifier.receive + end end private def self.instance : self @@instance ||= new end + delegate run, stop, state, to: @overseer + delegate running?, to: @overseer.state + getter overseer : Runners::Overseer + def initialize Mosquito.configuration.validate @overseer = Runners::Overseer.new end - - def run - spawn do - @overseer.run - end - end - - def stop - @overseer.stop - end end end diff --git a/src/mosquito/runners/coordinator.cr b/src/mosquito/runners/coordinator.cr index 84faba2c..9b5324e4 100644 --- a/src/mosquito/runners/coordinator.cr +++ b/src/mosquito/runners/coordinator.cr @@ -1,5 +1,5 @@ module Mosquito::Runners - # primer? loader? + # primer? loader? _scheduler_ class Coordinator Log = ::Log.for self LockTTL = 10.seconds @@ -15,7 +15,11 @@ module Mosquito::Runners @emitted_scheduling_deprecation_runtime_message = false end - def bloop + def runnable_name : String + "Coordinator<#{object_id}>" + end + + def schedule : Nil only_if_coordinator do enqueue_periodic_jobs enqueue_delayed_jobs @@ -41,13 +45,13 @@ module Mosquito::Runners end if Mosquito.backend.lock? lock_key, instance_id, LockTTL - Log.debug { "Coordinator lock acquired" } + Log.trace { "Coordinator lock acquired" } duration = Time.measure do yield end Mosquito.backend.unlock lock_key, instance_id - Log.debug { "Coordinator lock released" } + Log.trace { "Coordinator lock released" } end return unless duration > LockTTL @@ -57,10 +61,6 @@ module Mosquito::Runners def enqueue_periodic_jobs Base.scheduled_job_runs.each do |scheduled_job_run| enqueued = scheduled_job_run.try_to_execute - - Log.for("enqueue_periodic_jobs").info { - "enqueued #{scheduled_job_run.class}" if enqueued - } end end diff --git a/src/mosquito/runners/executor.cr b/src/mosquito/runners/executor.cr index 7e23cb44..7cbbeb14 100644 --- a/src/mosquito/runners/executor.cr +++ b/src/mosquito/runners/executor.cr @@ -1,3 +1,6 @@ +require "./run_at_most" +require "./runnable" + module Mosquito::Runners # An Executor is responsible for building Job classes with deserialized # parameters and calling #run on them. It measures the time it takes to @@ -7,8 +10,10 @@ module Mosquito::Runners # if it will return only after a relative eternity. class Executor include RunAtMost + include Runnable Log = ::Log.for self + getter log : ::Log # How long a job config is persisted after success property successful_job_ttl : Int32 { Mosquito.configuration.successful_job_ttl } @@ -16,29 +21,53 @@ module Mosquito::Runners # How long a job config is persisted after failure property failed_job_ttl : Int32 { Mosquito.configuration.failed_job_ttl } - getter queue_list : QueueList + getter job_pipeline : Channel(Tuple(JobRun, Queue)) + getter idle_bell : Channel(Bool) + + private def state=(state : State) + if state == State::Idle + spawn { idle_bell.send true } + end - def initialize(@queue_list) + super end - def dequeue_and_run_jobs - queue_list.each do |q| - run_next_job q - end + def initialize(@job_pipeline, @idle_bell) + @log = Log.for(object_id.to_s) end - def run_next_job(q : Queue) - job_run = q.dequeue - return unless job_run + def runnable_name : String + "Executor<#{object_id}>" + end - Log.notice { "#{"Starting:".colorize.magenta} #{job_run} from #{q.name}" } + def pre_run : Nil + # Overseer won't try to dequeue and send any jobs unless it + # knows that an executor is idle, so the first thing to do + # is mark this executor as idle. + self.state = State::Idle + end + + def each_run : Nil + dequeue = job_pipeline.receive? + return if dequeue.nil? + + self.state = State::Working + job_run, queue = dequeue + log.trace { "Dequeued #{job_run} from #{queue.name}" } + execute job_run, queue + log.trace { "Finished #{job_run} from #{queue.name}" } + self.state = State::Idle + end + + def execute(job_run : JobRun, from_queue q : Queue) + log.info { "#{"Starting:".colorize.magenta} #{job_run} from #{q.name}" } duration = Time.measure do job_run.run end.total_seconds if job_run.succeeded? - Log.notice { "#{"Success:".colorize.green} #{job_run} finished and took #{time_with_units duration}" } + log.info { "#{"Success:".colorize.green} #{job_run} finished and took #{time_with_units duration}" } q.forget job_run job_run.delete in: successful_job_ttl @@ -60,14 +89,14 @@ module Mosquito::Runners message << " (at " message << next_execution message << ")" + log.warn { message.to_s } else q.banish job_run job_run.delete in: failed_job_ttl message << "cannot be rescheduled".colorize.yellow + log.error { message.to_s } end - - Log.warn { message.to_s } end end diff --git a/src/mosquito/runners/idle_wait.cr b/src/mosquito/runners/idle_wait.cr new file mode 100644 index 00000000..0483f38b --- /dev/null +++ b/src/mosquito/runners/idle_wait.cr @@ -0,0 +1,14 @@ +module Mosquito::Runners + module IdleWait + def with_idle_wait(idle_wait : Time::Span) + delta = Time.measure do + yield + end + + if delta < idle_wait + # Fiber.timeout(idle_wait - delta) + sleep(idle_wait - delta) + end + end + end +end diff --git a/src/mosquito/runners/overseer.cr b/src/mosquito/runners/overseer.cr index bd3aa07a..26dbd898 100644 --- a/src/mosquito/runners/overseer.cr +++ b/src/mosquito/runners/overseer.cr @@ -1,3 +1,8 @@ +require "./idle_wait" +require "./queue_list" +require "./run_at_most" +require "./runnable" + module Mosquito::Runners # The Overseer is responsible for managing: # - a `Coordinator` @@ -7,59 +12,147 @@ module Mosquito::Runners # # An overseer manages the loop that each thread or process runs. class Overseer + include IdleWait include RunAtMost + include Runnable Log = ::Log.for self - # Minimum time in seconds to wait between checking for jobs. - property idle_wait : Time::Span { + getter queue_list : QueueList + getter executors, coordinator, work_handout, idle_notifier + getter executor_count = 3 + + getter idle_wait : Time::Span { Mosquito.configuration.idle_wait } - property keep_running : Bool - - getter queue_list, executor, coordinator - def initialize + @idle_notifier = Channel(Bool).new + @queue_list = QueueList.new @coordinator = Coordinator.new queue_list - @executor = Executor.new queue_list + @executors = [] of Executor + @work_handout = Channel(Tuple(JobRun, Queue)).new + + executor_count.times do + @executors << build_executor + end + end + + def build_executor : Executor + Executor.new(work_handout, idle_notifier) + end - @keep_running = true + def runnable_name : String + "Overseer<#{object_id}>" end - def worker_id - "Worker [#{coordinator.instance_id}]" + def sleep + Log.trace { "Going to sleep now for #{idle_wait}" } + sleep idle_wait end - def stop - Log.info { worker_id + " is done after this job." } - @keep_running = false + def pre_run : Nil + Log.info { "Starting #{@executors.size} executors." } + @queue_list.run + @executors.each(&.run) end - # Runs the overseer workflow. - # Infinite loop. - def run - Log.info { worker_id + " clocking in." } + def post_run : Nil + Log.info { "Stopping #{@executors.size} executors." } + stopped_notifiers = executors.map do |executor| + executor.stop + end + work_handout.close + stopped_notifiers.each(&.receive) + Log.info { "All executors stopped." } + end + + # The goal for the overseer is to: + # - Ensure that the coordinator gets run frequently to schedule delayed/periodic jobs. + # - Wait for an executor to be idle, and dequeue work if possible. + # - Monitor the executor pool for unexpected termination and respawn. + def each_run : Nil + coordinator.schedule + + if work_handout.closed? || idle_notifier.closed? + Log.fatal { "Executor communication channels closed, overseer will stop." } + stop + return + end + + # If the queue list hasn't run at least once, it won't have any queues to + # search for + unless queue_list.state.started? + Log.debug { "Waiting for the queue list to fetch possible queues" } + return + end + + + Log.trace { "Waiting for an idle executor" } + all_executors_busy = true + + # This feature is under documented in the crystal manual. + # This will attempt to receive from a the idle notifier, but only + # wait for up to idle_wait seconds. + # + # The interrupt is necessary to remind the coordinator to schedule + # jobs. + select + when @idle_notifier.receive + Log.trace { "Found an idle executor" } + all_executors_busy = false + when timeout(idle_wait) + end - while keep_running - tick + case + when all_executors_busy + Log.trace { "No idle executors" } + when next_job_run = dequeue_job? + job_run, queue = next_job_run + Log.debug { "Dequeued job: #{job_run.id} #{queue.name}" } + work_handout.send next_job_run + else + Log.debug { "No job to dequeue" } + sleep + + # Arrival at this branch means that an executor is idle, but there were + # no jobs. The idle notification has been consumed, and it needs to be + # re-sent so that the next loop can still find the idle executor. + spawn { @idle_notifier.send true } end - Log.info { worker_id + " finished for now." } + check_for_deceased_runners end - def tick - delta = Time.measure do - queue_list.fetch - run_at_most every: 1.second, label: :coordinator do - coordinator.bloop + # Weaknesses: This implementation sometimes starves queues because it doesn't + # round robin, prioritize queues, or anything else. + def dequeue_job? : Tuple(JobRun, Queue)? + queue_list.each do |q| + if job_run = q.dequeue + return { job_run, q } end - executor.dequeue_and_run_jobs + end + end + + def check_for_deceased_runners : Nil + executors.select{|e| e.state.started?}.select(&.dead?).each do |dead_executor| + Log.fatal do + <<-MSG + Executor #{dead_executor.runnable_name} died. + A new executor will be started. + MSG + end + executors.delete dead_executor + end + + (executor_count - executors.size).times do + executors << build_executor.tap(&.run) end - if delta < idle_wait - sleep(idle_wait - delta) + if queue_list.dead? + Log.fatal { "QueueList has died, overseer will stop." } + stop end end end diff --git a/src/mosquito/runners/queue_list.cr b/src/mosquito/runners/queue_list.cr index 93d4b239..90b8f3b5 100644 --- a/src/mosquito/runners/queue_list.cr +++ b/src/mosquito/runners/queue_list.cr @@ -1,24 +1,50 @@ +require "./run_at_most" +require "./runnable" +require "./idle_wait" + module Mosquito::Runners # QueueList handles searching the redis keyspace for named queues. class QueueList + Log = ::Log.for self + include RunAtMost + include Runnable + include IdleWait + + getter queues : Array(Queue) def initialize @queues = [] of Queue end - delegate each, to: @queues + def runnable_name : String + "QueueList<#{object_id}>" + end + + delegate each, to: @queues.shuffle + + def each_run : Nil + # This idle wait should be at most 1 second. Longer can cause periodic jobs + # which are specified at the second-level to be executed aperiodically. + # Shorter will generate excess noise in the redis connection. + with_idle_wait(1.seconds) do + @state = State::Working - def fetch - run_at_most every: 0.25.seconds, label: :fetch_queues do |t| candidate_queues = Mosquito.backend.list_queues.map { |name| Queue.new name } - @queues = filter_queues candidate_queues + new_queue_list = filter_queues candidate_queues + + Log.notice { + queues_which_were_expected_but_not_found = @queues - new_queue_list + queues_which_have_never_been_seen = new_queue_list - @queues - Log.for("fetch_queues").debug { - if @queues.size > 0 - "found #{@queues.size} queues: #{@queues.map(&.name).join(", ")}" + if queues_which_have_never_been_seen.size > 0 + "found #{queues_which_have_never_been_seen.size} new queues: #{queues_which_have_never_been_seen.map(&.name).join(", ")}" end } + + @queues = new_queue_list + + @state = State::Idle end end @@ -29,7 +55,7 @@ module Mosquito::Runners permitted_queues.includes? queue.name end - Log.for("filter_queues").debug { + Log.for("filter_queues").notice { if filtered_queues.empty? filtered_out_queues = present_queues - filtered_queues diff --git a/src/mosquito/runners/runnable.cr b/src/mosquito/runners/runnable.cr new file mode 100644 index 00000000..e10b8ad1 --- /dev/null +++ b/src/mosquito/runners/runnable.cr @@ -0,0 +1,77 @@ +module Mosquito::Runners + enum State + Starting + Working + Idle + Stopping + Finished + + def running? + starting? || working? || idle? + end + + # ie, not starting + def started? + working? || idle? + end + end + + module Runnable + getter state : State = State::Starting + getter fiber : Fiber? + getter my_name : String { + "#{self.class.name.underscore.gsub("::", ".")}.#{self.object_id}" + } + + private def state=(state : State) + @state = state + end + + def dead? : Bool + if fiber_ = fiber + fiber_.dead? + else + false + end + end + + def run + @fiber = spawn(name: my_name) do + log = Log.for(my_name) + log.info { runnable_name + " is starting" } + + self.state = State::Working + pre_run + + while state.running? + each_run + end + + post_run + self.state = State::Finished + end + end + + def stop : Channel(Bool) + self.state = State::Stopping if state.running? + notifier = Channel(Bool).new + + spawn do + start = Time.utc + while state.stopping? && (Time.utc - start) < 25.seconds + Fiber.yield + end + notifier.send state.finished? + + Log.info { runnable_name + " has stopped" } + end + + notifier + end + + abstract def runnable_name : String + abstract def each_run : Nil + def pre_run : Nil ; end + def post_run : Nil ; end + end +end diff --git a/test/helpers/logging_helper.cr b/test/helpers/logging_helper.cr index 4f33e797..22146c51 100644 --- a/test/helpers/logging_helper.cr +++ b/test/helpers/logging_helper.cr @@ -1,3 +1,5 @@ +require "log" + class TestingLogBackend < Log::MemoryBackend def self.instance @@instance ||= new @@ -42,4 +44,8 @@ class Minitest::Test end end -Log.builder.bind "*", :debug, TestingLogBackend.instance +Log.setup do |config| + config.bind "*", :debug, TestingLogBackend.instance + config.bind "redis.*", :warn, TestingLogBackend.instance + config.bind "mosquito.*", :trace, TestingLogBackend.instance +end diff --git a/test/helpers/mock_coordinator.cr b/test/helpers/mock_coordinator.cr index b5c83f82..38ae131d 100644 --- a/test/helpers/mock_coordinator.cr +++ b/test/helpers/mock_coordinator.cr @@ -1,4 +1,12 @@ class MockCoordinator < Mosquito::Runners::Coordinator + getter schedule_count + + def initialize(queue_list : Mosquito::Runners::QueueList) + super + + @schedule_count = 0 + end + def only_if_coordinator : Nil if @always_coordinator yield @@ -14,4 +22,9 @@ class MockCoordinator < Mosquito::Runners::Coordinator def always_coordinator!(always = true) @always_coordinator = always end + + def schedule + @schedule_count += 1 + super + end end diff --git a/test/helpers/mock_executor.cr b/test/helpers/mock_executor.cr index 46edaa90..0f35181e 100644 --- a/test/helpers/mock_executor.cr +++ b/test/helpers/mock_executor.cr @@ -1,2 +1,24 @@ class MockExecutor < Mosquito::Runners::Executor + def state=(state : Mosquito::Runners::State) + super + end + + def run + self.state = Mosquito::Runners::State::Working + end + + def stop + self.state = Mosquito::Runners::State::Stopping + Channel(Bool).new.tap do |notifier| + spawn { + self.state = Mosquito::Runners::State::Finished + notifier.send true + } + end + end + + def receive_job + job_run, queue = job_pipeline.receive + job_run + end end diff --git a/test/helpers/mock_overseer.cr b/test/helpers/mock_overseer.cr index 08139099..20834f92 100644 --- a/test/helpers/mock_overseer.cr +++ b/test/helpers/mock_overseer.cr @@ -1,3 +1,3 @@ class MockOverseer < Mosquito::Runners::Overseer - property queue_list, coordinator, executor + property queue_list, coordinator, executors, work_handout, idle_notifier end diff --git a/test/helpers/mock_queue_list.cr b/test/helpers/mock_queue_list.cr index 7a6c23c5..11c605ad 100644 --- a/test/helpers/mock_queue_list.cr +++ b/test/helpers/mock_queue_list.cr @@ -1,3 +1,4 @@ class MockQueueList < Mosquito::Runners::QueueList - getter :queues + getter queues + setter state end diff --git a/test/mosquito/queued_job_test.cr b/test/mosquito/queued_job_test.cr index a1c0cbe4..ca8489e7 100644 --- a/test/mosquito/queued_job_test.cr +++ b/test/mosquito/queued_job_test.cr @@ -36,7 +36,7 @@ describe Mosquito::QueuedJob do it "can be passed in" do clear_logs EchoJob.new("quack").perform - assert_includes logs, "quack" + assert_logs_match "quack" end it "can have a boolean false passed as a parameter (and it's not assumed to be a nil)" do diff --git a/test/mosquito/runners/executor_test.cr b/test/mosquito/runners/executor_test.cr index 09f14e76..9521a892 100644 --- a/test/mosquito/runners/executor_test.cr +++ b/test/mosquito/runners/executor_test.cr @@ -1,8 +1,10 @@ require "../../test_helper" describe "Mosquito::Runners::Executor" do + getter(executor_pipeline) { Channel(Tuple(JobRun, Queue)).new } + getter(idle_notifier) { Channel(Bool).new } getter(queue_list) { MockQueueList.new } - getter(executor) { Mosquito::Runners::Executor.new queue_list } + getter(executor) { MockExecutor.new executor_pipeline, idle_notifier } getter(coordinator) { Mosquito::Runners::Coordinator.new queue_list } def register(job_class : Mosquito::Job.class) @@ -13,10 +15,31 @@ describe "Mosquito::Runners::Executor" do def run_job(job_class : Mosquito::Job.class) register job_class job_class.reset_performance_counter! - job_class.new.enqueue - executor.run_next_job job_class.queue + job_run = job_class.new.enqueue + executor.execute job_run, from_queue: job_class.queue end + describe "status" do + it "starts as starting" do + assert_equal State::Starting, executor.state + end + + it "broadcasts a ping when transitioning to idle" do + executor.state = State::Idle + + select + when idle_notifier.receive + assert true + when timeout(0.5.seconds) + refute true, "Timed out waiting for idle notifier" + end + end + + it "goes idle in pre_run" do + executor.pre_run + assert_equal State::Idle, executor.state + end + end describe "running jobs" do it "runs a job from a queue" do @@ -37,7 +60,7 @@ describe "Mosquito::Runners::Executor" do FailingJob.queue.enqueue job_run Timecop.freeze now do - executor.run_next_job job.class.queue + executor.execute job_run, from_queue: job.class.queue end job_run.reload @@ -45,7 +68,7 @@ describe "Mosquito::Runners::Executor" do Timecop.freeze now + job.reschedule_interval(1) do coordinator.enqueue_delayed_jobs - executor.run_next_job job.class.queue + executor.execute job_run, from_queue: job.class.queue end job_run.reload @@ -62,7 +85,7 @@ describe "Mosquito::Runners::Executor" do job_run.store NonReschedulableFailingJob.queue.enqueue job_run - executor.run_next_job NonReschedulableFailingJob.queue + executor.execute job_run, from_queue: NonReschedulableFailingJob.queue actual_ttl = backend.expires_in job_run.config_key assert_equal executor.failed_job_ttl, actual_ttl @@ -78,7 +101,7 @@ describe "Mosquito::Runners::Executor" do job_run.store QueuedTestJob.queue.enqueue job_run - executor.run_next_job QueuedTestJob.queue + executor.execute job_run, from_queue: QueuedTestJob.queue assert_logs_match "Success" diff --git a/test/mosquito/runners/overseer_test.cr b/test/mosquito/runners/overseer_test.cr index bbadc667..c5527a4c 100644 --- a/test/mosquito/runners/overseer_test.cr +++ b/test/mosquito/runners/overseer_test.cr @@ -1,69 +1,107 @@ require "../../test_helper" describe "Mosquito::Runners::Overseer" do + getter(executor_pipeline) { Channel(Tuple(JobRun, Queue)).new } + getter(idle_notifier) { Channel(Bool).new } getter(queue_list) { MockQueueList.new } getter(coordinator) { MockCoordinator.new queue_list } - getter(executor) { MockExecutor.new queue_list } + getter(executor) { MockExecutor.new executor_pipeline, idle_notifier } getter(overseer : MockOverseer) { MockOverseer.new.tap do |o| o.queue_list = queue_list o.coordinator = coordinator - o.executor = executor + o.idle_notifier = idle_notifier + o.executors = [] of Mosquito::Runners::Executor + o.executor_count.times do + o.executors << executor.as(Mosquito::Runners::Executor) + end end } - describe "tick" do - it "waits the proper amount of time between cycles" do - clean_slate do - tick_time = Time.measure do - overseer.tick - end + def register(job_class : Mosquito::Job.class) + Mosquito::Base.register_job_mapping job_class.name.underscore, job_class + queue_list.queues << job_class.queue + end - assert_in_epsilon( - overseer.idle_wait.total_seconds, - tick_time.total_seconds, - epsilon: 0.02 - ) + def run_job(job_class : Mosquito::Job.class) + register job_class + job_class.reset_performance_counter! + job_run = job_class.new.enqueue + executor.execute job_run, from_queue: job_class.queue + end + + describe "pre_run" do + it "runs all executors" do + overseer.executors.each do |executor| + assert_equal State::Starting, executor.state + end + overseer.pre_run + overseer.executors.each do |executor| + assert_equal State::Working, executor.state end end end - describe "run" do - it "should log a startup message" do - overseer.keep_running = false - clear_logs - overseer.run - assert_logs_match "clocking in." + describe "post_run" do + it "stops all executors" do + overseer.executors.each(&.run) + overseer.post_run + overseer.executors.each do |executor| + assert_equal State::Finished, executor.state + end end - it "should log a finished message" do - overseer.keep_running = false + it "logs messages about stopping the executors" do clear_logs - overseer.run - assert_logs_match "finished for now" + overseer.pre_run + overseer.post_run + assert_logs_match "Stopping #{overseer.executor_count} executors." + assert_logs_match "All executors stopped." end end - describe "stop" do - it "should log a stop message" do - clear_logs - overseer.stop - assert_logs_match "is done after this job." - end + describe "each_run" do + it "dequeues a job and dispatches it to the pipeline" do + clean_slate do + register QueuedTestJob + expected_job_run = QueuedTestJob.new.enqueue + + overseer.work_handout = Channel(Tuple(JobRun, Queue)).new - it "should set the stop flag" do - overseer.stop - assert_equal false, overseer.keep_running + queue_list.state = State::Working + executor.state = State::Idle + + # each_run will block until there's a receiver on the channel + spawn { overseer.each_run } + actual_job_run, queue = overseer.work_handout.receive + assert_equal expected_job_run, actual_job_run + assert_equal QueuedTestJob.queue, queue + end end - end - describe "worker_id" do - it "should return a unique id" do - one = Mosquito::Runners::Overseer.new - two = Mosquito::Runners::Overseer.new + it "waits #idle_wait before checking the queue again" do + clean_slate do + # an idle executor, but no jobs in the queue + executor.state = State::Idle + queue_list.state = State::Working + + tick_time = Time.measure do + overseer.each_run + end + + assert_in_epsilon( + overseer.idle_wait.total_seconds, + tick_time.total_seconds, + epsilon: 0.05 + ) + end + end - refute_equal one.worker_id, two.worker_id + it "triggers the scheduler" do + assert_equal 0, coordinator.schedule_count + overseer.each_run + assert_equal 1, coordinator.schedule_count end end end diff --git a/test/mosquito/runners/queue_list_test.cr b/test/mosquito/runners/queue_list_test.cr index c11cdae9..c918b639 100644 --- a/test/mosquito/runners/queue_list_test.cr +++ b/test/mosquito/runners/queue_list_test.cr @@ -9,20 +9,21 @@ describe "Mosquito::Runners::QueueList" do EchoJob.new(text: "hello world").enqueue end - describe "fetch" do + describe "each_run" do it "returns a list of queues" do clean_slate do enqueue_jobs - queue_list.fetch + queue_list.each_run assert_equal ["failing_job", "io_queue", "passing_job"], queue_list.queues.map(&.name).sort end end it "logs a message about the number of fetched queues" do clean_slate do + clear_logs enqueue_jobs - queue_list.fetch - assert_logs_match "found 3 queues" + queue_list.each_run + assert_logs_match "found 3 new queues" end end end @@ -33,7 +34,7 @@ describe "Mosquito::Runners::QueueList" do enqueue_jobs Mosquito.temp_config(run_from: ["io_queue", "passing_job"]) do - queue_list.fetch + queue_list.each_run end end @@ -45,7 +46,7 @@ describe "Mosquito::Runners::QueueList" do enqueue_jobs Mosquito.temp_config(run_from: ["test4"]) do - queue_list.fetch + queue_list.each_run end assert_logs_match "No watchable queues found." @@ -54,7 +55,7 @@ describe "Mosquito::Runners::QueueList" do it "doesnt log an error when no queues are present" do clean_slate do - queue_list.fetch + queue_list.each_run refute_logs_match "No watchable queues found." end end diff --git a/test/mosquito/runners/runnable_test.cr b/test/mosquito/runners/runnable_test.cr new file mode 100644 index 00000000..2c70c0e3 --- /dev/null +++ b/test/mosquito/runners/runnable_test.cr @@ -0,0 +1,66 @@ +class Namespace::ConcreteRunnable + include Mosquito::Runners::Runnable + + getter first_run_notifier = Channel(Bool).new + getter first_run = true + property state : Mosquito::Runners::State + + # Testing wedge which calls: run, waits for a run to happen, and then calls stop. + def test_run : Nil + run + first_run_notifier.receive + stop.receive + end + + def runnable_name : String + "concrete_runnable" + end + + def each_run : Nil + if first_run + @first_run = false + first_run_notifier.send true + end + Fiber.yield + end + + def stop + first_run_notifier.close + super + end +end + +describe Mosquito::Runners::Runnable do + let(:runnable) { Namespace::ConcreteRunnable.new } + + it "builds a my_name" do + assert_equal "namespace.concrete_runnable.#{runnable.object_id}", runnable.my_name + end + + describe "run" do + it "should log a startup message" do + clear_logs + runnable.test_run + assert_logs_match "concrete_runnable is starting" + end + + it "should log a finished message" do + clear_logs + runnable.test_run + assert_logs_match "concrete_runnable has stopped" + end + end + + describe "stop" do + it "should set the stopping flag" do + runnable.state = Mosquito::Runners::State::Working + notifier = runnable.stop + assert_equal Mosquito::Runners::State::Stopping, runnable.state + end + + it "should set the finished flag" do + runnable.test_run + assert_equal Mosquito::Runners::State::Finished, runnable.state + end + end +end