Skip to content

Commit

Permalink
provides for multiple executors operating under a single runner
Browse files Browse the repository at this point in the history
  • Loading branch information
robacarp committed Dec 10, 2023
1 parent 39ddc84 commit 05c6ed1
Show file tree
Hide file tree
Showing 18 changed files with 414 additions and 127 deletions.
17 changes: 16 additions & 1 deletion demo/run.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ require "../src/mosquito"

Mosquito.configure do |settings|
settings.redis_url = ENV["REDIS_URL"]? || "redis://localhost:6379/3"
settings.idle_wait = 1.second
end

Mosquito.configuration.backend.flush

Log.setup do |c|
backend = Log::IOBackend.new

c.bind "redis.*", :debug, backend
c.bind "mosquito.*", :trace, backend
end

require "./jobs/*"

def expect_run_count(klass, expected)
Expand All @@ -17,8 +25,15 @@ def expect_run_count(klass, expected)
end
end

stopping = false
Signal::INT.trap do
if stopping
puts "SIGINT received again, crash-exiting."
exit 1
end

Mosquito::Runner.stop
stopping = true
end

Mosquito::Runner.start(spin: false)
Expand All @@ -29,7 +44,7 @@ while count <= 20 && Mosquito::Runner.keep_running
count += 1
end

Mosquito::Runner.stop
Mosquito::Runner.stop(wait: true)

puts "End of demo."
puts "----------------------------------"
Expand Down
4 changes: 2 additions & 2 deletions src/mosquito/configuration.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ module Mosquito
property failed_job_ttl : Int32 = 86400

@[Deprecated("cron scheduling can now handled automatically. See https://github.com/mosquito-cr/mosquito/pull/108")]
property run_cron_scheduler : Bool = true
property use_distributed_lock : Bool = false
property run_cron_scheduler : Bool = false
property use_distributed_lock : Bool = true

property run_from : Array(String) = [] of String
property backend : Mosquito::Backend.class = Mosquito::RedisBackend
Expand Down
37 changes: 18 additions & 19 deletions src/mosquito/runner.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,46 @@ module Mosquito
class Runner
Log = ::Log.for self

# Should mosquito continue working?
class_property keep_running : Bool = true

# Start the mosquito runner.
#
# If spin = true (default) the function will not return until the runner is
# shut down. Otherwise it will return immediately.
#
def self.start(spin = true) Log.notice { "Mosquito is buzzing..." }
def self.start(spin = true)
Log.notice { "Mosquito is buzzing..." }
instance.run

while spin && @@keep_running
sleep 1
while spin && keep_running
Fiber.yield
end
end

def self.keep_running : Bool
instance.state.starting? || instance.state.running?
end

# Request the mosquito runner stop. The runner will not abort the current job
# but it will not start any new jobs.
def self.stop
def self.stop(wait = false)
Log.notice { "Mosquito is shutting down..." }
self.keep_running = false
instance.stop
finished_notifier = instance.stop

if wait
finished_notifier.receive
end
end

private def self.instance : self
@@instance ||= new
end

delegate run, stop, state, to: @overseer
delegate running?, to: @overseer.state
getter overseer : Runners::Overseer

def initialize
Mosquito.configuration.validate
@overseer = Runners::Overseer.new
end

def run
spawn do
@overseer.run
end
end

def stop
@overseer.stop
end
end
end
14 changes: 9 additions & 5 deletions src/mosquito/runners/coordinator.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Mosquito::Runners
# primer? loader?
# primer? loader? _scheduler_
class Coordinator
Log = ::Log.for self
LockTTL = 10.seconds
Expand All @@ -15,7 +15,11 @@ module Mosquito::Runners
@emitted_scheduling_deprecation_runtime_message = false
end

def bloop
def runnable_name : String
"Coordinator<#{object_id}>"
end

def schedule : Nil
only_if_coordinator do
enqueue_periodic_jobs
enqueue_delayed_jobs
Expand All @@ -41,13 +45,13 @@ module Mosquito::Runners
end

if Mosquito.backend.lock? lock_key, instance_id, LockTTL
Log.debug { "Coordinator lock acquired" }
Log.trace { "Coordinator lock acquired" }
duration = Time.measure do
yield
end

Mosquito.backend.unlock lock_key, instance_id
Log.debug { "Coordinator lock released" }
Log.trace { "Coordinator lock released" }
end

return unless duration > LockTTL
Expand All @@ -58,7 +62,7 @@ module Mosquito::Runners
Base.scheduled_job_runs.each do |scheduled_job_run|
enqueued = scheduled_job_run.try_to_execute

Log.for("enqueue_periodic_jobs").info {
Log.for("enqueue_periodic_jobs").notice {
"enqueued #{scheduled_job_run.class}" if enqueued
}
end
Expand Down
38 changes: 25 additions & 13 deletions src/mosquito/runners/executor.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
require "./run_at_most"
require "./runnable"

module Mosquito::Runners
# An Executor is responsible for building Job classes with deserialized
# parameters and calling #run on them. It measures the time it takes to
Expand All @@ -7,38 +10,47 @@ module Mosquito::Runners
# if it will return only after a relative eternity.
class Executor
include RunAtMost
include Runnable

Log = ::Log.for self
getter log : ::Log

# How long a job config is persisted after success
property successful_job_ttl : Int32 { Mosquito.configuration.successful_job_ttl }

# How long a job config is persisted after failure
property failed_job_ttl : Int32 { Mosquito.configuration.failed_job_ttl }
property state : State = State::Starting

getter queue_list : QueueList
getter job_pipeline : Channel(Tuple(JobRun, Queue))

def initialize(@queue_list)
def initialize(@job_pipeline)
@log = Log.for(object_id.to_s)
end

def dequeue_and_run_jobs
queue_list.each do |q|
run_next_job q
end
def runnable_name : String
"Executor<#{object_id}>"
end

def run_next_job(q : Queue)
job_run = q.dequeue
return unless job_run
def each_run : Nil
dequeue = job_pipeline.receive?
return if dequeue.nil?

Log.notice { "#{"Starting:".colorize.magenta} #{job_run} from #{q.name}" }
job_run, queue = dequeue
@state = State::Working
execute job_run, queue
@state = State::Idle
end

def execute(job_run : JobRun, from_queue q : Queue)
log.info { "#{"Starting:".colorize.magenta} #{job_run} from #{q.name}" }

duration = Time.measure do
job_run.run
end.total_seconds

if job_run.succeeded?
Log.notice { "#{"Success:".colorize.green} #{job_run} finished and took #{time_with_units duration}" }
log.info { "#{"Success:".colorize.green} #{job_run} finished and took #{time_with_units duration}" }
q.forget job_run
job_run.delete in: successful_job_ttl

Expand All @@ -60,14 +72,14 @@ module Mosquito::Runners
message << " (at "
message << next_execution
message << ")"
log.warn { message.to_s }
else
q.banish job_run
job_run.delete in: failed_job_ttl

message << "cannot be rescheduled".colorize.yellow
log.error { message.to_s }
end

Log.warn { message.to_s }
end
end

Expand Down
14 changes: 14 additions & 0 deletions src/mosquito/runners/idle_wait.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module Mosquito::Runners
module IdleWait
def with_idle_wait(idle_wait : Time::Span)
delta = Time.measure do
yield
end

if delta < idle_wait
# Fiber.timeout(idle_wait - delta)
sleep(idle_wait - delta)
end
end
end
end
Loading

0 comments on commit 05c6ed1

Please sign in to comment.