Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup: moves most console log statements into Observability classes #152

Merged
merged 5 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- (minor breaking) Logs are now emitted from runners with a slighly different source tag. (#152)
For example:
The overseer boot message used to be:
`INFO - mosquito.runners.overseer.4315742080: Overseer<4315742080> is starting`
Now the message is simply:
`INFO - mosquito.overseer: starting`

## [2.0.0]
### Added
- Adds a test backend, which can be used to inspect jobs that were enqueued and
Expand Down
1 change: 0 additions & 1 deletion demo/run.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ Log.setup do |c|

c.bind "redis.*", :warn, backend
c.bind "mosquito.*", :debug, backend
c.bind "mosquito.runners.overseer", :trace, backend
end

require "./jobs/*"
Expand Down
22 changes: 11 additions & 11 deletions spec/mosquito/api/executor_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ require "../../spec_helper"
describe Mosquito::Api::Executor do
let(executor_pipeline) { Channel(Tuple(Mosquito::JobRun, Mosquito::Queue)).new }
let(idle_notifier) { Channel(Bool).new }
let(job_run_id) { "job_run_id" }
let(queue_name) { "a queue" }
let(job_run) { Mosquito::JobRun.new "job_run", Time.utc, job_run_id }
let(queue) { Mosquito::Queue.new queue_name }
let(job) { QueuedTestJob.new }
let(job_run : Mosquito::JobRun) { job.enqueue }

let(executor) { MockExecutor.new executor_pipeline, idle_notifier }
let(api) { Mosquito::Api::Executor.new executor.object_id.to_s }
let(observer) { Mosquito::Observability::Executor.new executor }

it "can read the current job and queue after being started" do
observer.start job_run, queue
assert_equal job_run_id, api.current_job
assert_equal queue_name, api.current_job_queue
end
it "can read the current job and queue after being started, and clears it after" do
Mosquito::Base.register_job_mapping job.class.name.underscore, job.class
job_run.store
job_run.build_job

observer.execute job_run, job.class.queue do
assert_equal job_run.id, api.current_job
assert_equal job.class.queue.name, api.current_job_queue
end

it "clears the current job and queue after being started" do
observer.finish true
assert api.current_job.nil?
assert api.current_job_queue.nil?
end
Expand Down
12 changes: 7 additions & 5 deletions spec/mosquito/backend/overseer_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ require "../../spec_helper"

describe Mosquito::Backend do
it "can keep a list of overseers" do
overseer_ids = ["overseer1", "overseer2", "overseer3"]
overseer_ids.each do |overseer_id|
Mosquito.backend.register_overseer overseer_id
end
clean_slate do
overseer_ids = ["overseer1", "overseer2", "overseer3"]
overseer_ids.each do |overseer_id|
Mosquito.backend.register_overseer overseer_id
end

assert_equal overseer_ids, Mosquito.backend.list_overseers
assert_equal overseer_ids, Mosquito.backend.list_overseers
end
end
end
5 changes: 0 additions & 5 deletions spec/mosquito/runners/executor_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,6 @@ describe "Mosquito::Runners::Executor" do
end
end

it "broadcasts a heartbeat to the observer" do
run_job QueuedTestJob
assert api.heartbeat
end

it "tells the observer what it's working on" do
SleepyJob.should_sleep = true
job = SleepyJob.new
Expand Down
2 changes: 1 addition & 1 deletion spec/mosquito/runners/overseer_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe "Mosquito::Runners::Overseer" do
clear_logs
overseer.pre_run
overseer.post_run
assert_logs_match "Stopping #{overseer.executor_count} executors."
assert_logs_match "Stopping executors."
assert_logs_match "All executors stopped."
end
end
Expand Down
2 changes: 2 additions & 0 deletions src/mosquito.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ require "./mosquito/runners/run_at_most"
require "./mosquito/**"

module Mosquito
Log = ::Log.for self

def self.backend
configuration.backend
end
Expand Down
60 changes: 57 additions & 3 deletions src/mosquito/api/executor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,78 @@ module Mosquito

module Observability
class Executor
private getter log : ::Log
def self.metadata_key(instance_id : String) : String
Backend.build_key "executor", instance_id
end

def initialize(executor : Mosquito::Runners::Executor)
@metadata = Metadata.new self.class.metadata_key executor.object_id.to_s
@log = Log.for(executor.runnable_name)
end

def start(job_run : JobRun, from_queue : Queue)
def execute(job_run : JobRun, from_queue : Queue)
@metadata["current_job"] = job_run.id
@metadata["current_job_queue"] = from_queue.name
end
log.info { "#{"Starting:".colorize.magenta} #{job_run} from #{from_queue.name}" }

duration = Time.measure do
yield
end

if job_run.succeeded?
log_success_message job_run, duration
else
log_failure_message job_run, duration
end

def finish(success : Bool)
@metadata["current_job"] = nil
@metadata["current_job_queue"] = nil
end

def log_success_message(job_run : JobRun, duration : Time::Span)
log.info { "#{"Success:".colorize.green} #{job_run} finished and took #{time_with_units duration}" }
end

def log_failure_message(job_run : JobRun, duration : Time::Span)
message = String::Builder.new
message << "Failure: ".colorize.red
message << job_run
message << " failed, taking "
message << time_with_units duration
message << " and "

if job_run.rescheduleable?
next_execution = Time.utc + job_run.reschedule_interval
message << "will run again".colorize.cyan
message << " in "
message << job_run.reschedule_interval
message << " (at "
message << next_execution
message << ")"
log.warn { message.to_s }
else
message << "cannot be rescheduled".colorize.yellow
log.error { message.to_s }
end
end

# :nodoc:
private def time_with_units(duration : Time::Span)
seconds = duration.total_seconds
if seconds > 0.1
"#{(seconds).*(100).trunc./(100)}s".colorize.red
elsif seconds > 0.001
"#{(seconds * 1_000).trunc}ms".colorize.yellow
elsif seconds > 0.000_001
"#{(seconds * 100_000).trunc}µs".colorize.green
elsif seconds > 0.000_000_001
"#{(seconds * 1_000_000_000).trunc}ns".colorize.green
else
"no discernible time at all".colorize.green
end
end

delegate heartbeat!, to: @metadata
end
end
Expand Down
28 changes: 27 additions & 1 deletion src/mosquito/api/overseer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,33 @@ module Mosquito
class Observability::Overseer
getter metadata : Metadata
getter instance_id : String
private getter overseer : Runners::Overseer
private getter log : ::Log

def self.metadata_key(instance_id : String) : String
Mosquito::Backend.build_key "overseer", instance_id
end

def initialize(overseer : Runners::Overseer)
def initialize(@overseer : Runners::Overseer)
@instance_id = overseer.object_id.to_s
@log = Log.for(overseer.runnable_name)
@metadata = Metadata.new self.class.metadata_key(instance_id)
end

def starting
log.info { "Starting #{overseer.executor_count} executors." }
heartbeat
end

def stopping
log.info { "Stopping executors." }
end

def stopped
log.info { "All executors stopped." }
log.info { "Overseer #{instance_id} finished for now." }
end

def heartbeat
# (Re)registers the overseer with the backend.
Mosquito.backend.register_overseer self.instance_id
Expand All @@ -50,6 +67,15 @@ module Mosquito
metadata.heartbeat!
end

def executor_died(executor : Runners::Executor) : Nil
log.fatal do
<<-MSG
Executor #{executor.runnable_name} died.
A new executor will be started.
MSG
end
end

def update_executor_list(executors : Array(Runners::Executor)) : Nil
metadata["executors"] = executors.map(&.object_id).join(",")
end
Expand Down
7 changes: 4 additions & 3 deletions src/mosquito/runnable.cr
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ module Mosquito
"#{self.class.name.underscore.gsub("::", ".")}.#{self.object_id}"
}

private getter log : ::Log { Log.for runnable_name }

private def state=(state : State)
@state = state
end
Expand All @@ -119,8 +121,7 @@ module Mosquito
# State can be altered internally or externally to cause it to exit
# but the cleanest way to do that is to call #stop.
def run
log = Log.for(my_name)
@fiber = spawn(name: my_name) do
@fiber = spawn(name: runnable_name) do
log.info { runnable_name + " is starting" }

self.state = State::Working
Expand Down Expand Up @@ -157,7 +158,7 @@ module Mosquito
end
notifier.send state.finished?

Log.info { runnable_name + " has stopped" }
log.info { runnable_name + " has stopped" }
end

notifier
Expand Down
2 changes: 1 addition & 1 deletion src/mosquito/runners/coordinator.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module Mosquito::Runners
end

def runnable_name : String
"Coordinator<#{object_id}>"
"coordinator.#{object_id}"
end

def schedule : Nil
Expand Down
53 changes: 3 additions & 50 deletions src/mosquito/runners/executor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ module Mosquito::Runners
include RunAtMost
include Runnable

Log = ::Log.for self
getter log : ::Log

# How long a job config is persisted after success
property successful_job_ttl : Int32 { Mosquito.configuration.successful_job_ttl }

Expand All @@ -53,12 +50,11 @@ module Mosquito::Runners
end

def initialize(@job_pipeline, @idle_bell)
@log = Log.for(object_id.to_s)
end

# :nodoc:
def runnable_name : String
"Executor<#{object_id}>"
"executor.#{object_id}"
end

# :nodoc:
Expand Down Expand Up @@ -89,65 +85,22 @@ module Mosquito::Runners
# Execution time is measured and logged, and the job is either forgotten
# or, if it fails, rescheduled.
def execute(job_run : JobRun, from_queue q : Queue)
log.info { "#{"Starting:".colorize.magenta} #{job_run} from #{q.name}" }
observer.start job_run, q

duration = Time.measure do
observer.execute job_run, q do
job_run.run
end.total_seconds

observer.finish job_run.succeeded?
end

if job_run.succeeded?
log.info { "#{"Success:".colorize.green} #{job_run} finished and took #{time_with_units duration}" }
q.forget job_run
job_run.delete in: successful_job_ttl

else
message = String::Builder.new
message << "Failure: ".colorize.red
message << job_run
message << " failed, taking "
message << time_with_units duration
message << " and "

if job_run.rescheduleable?
next_execution = Time.utc + job_run.reschedule_interval
q.reschedule job_run, next_execution

message << "will run again".colorize.cyan
message << " in "
message << job_run.reschedule_interval
message << " (at "
message << next_execution
message << ")"
log.warn { message.to_s }
else
q.banish job_run
job_run.delete in: failed_job_ttl

message << "cannot be rescheduled".colorize.yellow
log.error { message.to_s }
end
end

observer.heartbeat!
end

# :nodoc:
def time_with_units(seconds : Float64)
if seconds > 0.1
"#{(seconds).*(100).trunc./(100)}s".colorize.red
elsif seconds > 0.001
"#{(seconds * 1_000).trunc}ms".colorize.yellow
elsif seconds > 0.000_001
"#{(seconds * 100_000).trunc}µs".colorize.green
elsif seconds > 0.000_000_001
"#{(seconds * 1_000_000_000).trunc}ns".colorize.green
else
"no discernible time at all".colorize.green
end
end

end
end
Loading