diff --git a/spec/helpers/global_helpers.cr b/spec/helpers/global_helpers.cr index 60ba15f..49bc777 100644 --- a/spec/helpers/global_helpers.cr +++ b/spec/helpers/global_helpers.cr @@ -9,6 +9,7 @@ module TestHelpers backend.flush TestingLogBackend.instance.clear + PubSub.instance.clear yield end end diff --git a/spec/helpers/pub_sub.cr b/spec/helpers/pub_sub.cr new file mode 100644 index 0000000..47d3915 --- /dev/null +++ b/spec/helpers/pub_sub.cr @@ -0,0 +1,52 @@ +class PubSub + def self.instance + @@instance ||= new + end + + def self.eavesdrop : Array(Mosquito::Backend::BroadcastMessage) + instance.receive_messages + yield + instance.stop_listening + instance.messages + end + + getter messages = [] of Mosquito::Backend::BroadcastMessage + @channel = Channel(Mosquito::Backend::BroadcastMessage).new + @stopping_channel = Channel(Bool).new + + def initialize + end + + def receive_messages + @continue_receiving = true + spawn receive_loop + @channel = Mosquito.backend.subscribe "mosquito:*" + end + + def stop_listening + @continue_receiving = false + end + + def receive_loop + loop do + break unless @continue_receiving + select + when message = @channel.receive + @messages << message + when timeout(500.milliseconds) + end + end + @channel.close + end + + delegate clear, to: @messages + + module Helpers + delegate eavesdrop, to: PubSub + def assert_message_received(matcher : Regex) : Nil + PubSub.instance.messages.find do |message| + matcher === message.message + end + end + end +end diff --git a/spec/mosquito/api/executor_spec.cr b/spec/mosquito/api/executor_spec.cr index 51ee95c..f9998c0 100644 --- a/spec/mosquito/api/executor_spec.cr +++ b/spec/mosquito/api/executor_spec.cr @@ -37,4 +37,17 @@ describe Mosquito::Api::Executor do # the heartbeat is stored as a unix epoch without millis assert_equal now.at_beginning_of_second, api.heartbeat end + + it "publishes job started/finished events" do + job_run.store + job_run.build_job + + eavesdrop do + observer.execute job_run, job.class.queue do + end + end + + assert_message_received /job-started/ + assert_message_received /job-finished/ + end end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 7b92261..af8e7d5 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -13,6 +13,9 @@ Mosquito.configure do |settings| end require "./helpers/*" +class Minitest::Test + include PubSub::Helpers +end Mosquito.configuration.backend.flush diff --git a/src/mosquito/api.cr b/src/mosquito/api.cr index c83b2c9..b53d4f3 100644 --- a/src/mosquito/api.cr +++ b/src/mosquito/api.cr @@ -1,4 +1,5 @@ require "./backend" +require "./api/observability/*" require "./api/*" module Mosquito::Api @@ -24,4 +25,7 @@ module Mosquito::Api .map { |name| Overseer.new name } end + def self.event_receiver : Channel(Backend::BroadcastMessage) + Mosquito.backend.subscribe "mosquito:*" + end end diff --git a/src/mosquito/api/executor.cr b/src/mosquito/api/executor.cr index d8b4009..96b0ae9 100644 --- a/src/mosquito/api/executor.cr +++ b/src/mosquito/api/executor.cr @@ -35,6 +35,8 @@ module Mosquito module Observability class Executor + include Publisher + private getter log : ::Log def self.metadata_key(instance_id : String) : String Backend.build_key "executor", instance_id @@ -43,12 +45,19 @@ module Mosquito def initialize(executor : Mosquito::Runners::Executor) @metadata = Metadata.new self.class.metadata_key executor.object_id.to_s @log = Log.for(executor.runnable_name) + @publish_context = PublishContext.new [:executor, executor.object_id] end def execute(job_run : JobRun, from_queue : Queue) @metadata["current_job"] = job_run.id @metadata["current_job_queue"] = from_queue.name log.info { "#{"Starting:".colorize.magenta} #{job_run} from #{from_queue.name}" } + publish({ + event: "job-started", + job_run: job_run.id, + from_queue: from_queue.name, + # expected_duration_ms: expected_duration + }) duration = Time.measure do yield @@ -60,6 +69,7 @@ module Mosquito log_failure_message job_run, duration end + publish({event: "job-finished", job_run: job_run.id}) @metadata["current_job"] = nil @metadata["current_job_queue"] = nil end diff --git a/src/mosquito/api/observability/publisher.cr b/src/mosquito/api/observability/publisher.cr new file mode 100644 index 0000000..09274bc --- /dev/null +++ b/src/mosquito/api/observability/publisher.cr @@ -0,0 +1,27 @@ +module Mosquito::Observability::Publisher + getter publish_context : PublishContext + + def publish(data : NamedTuple) + Log.debug { "Publishing #{data} to #{@publish_context.originator}" } + Mosquito.backend.publish( + publish_context.originator, + data.to_json + ) + end + + class PublishContext + alias Context = Array(String | Symbol | UInt64) + property originator : String + property context : String + + def initialize(context : Context) + @context = KeyBuilder.build context + @originator = KeyBuilder.build "mosquito", @context + end + + def initialize(parent : self, context : Context) + @context = KeyBuilder.build context + @originator = KeyBuilder.build "mosquito", parent.context, context + end + end +end diff --git a/src/mosquito/backend.cr b/src/mosquito/backend.cr index ad02382..79a6037 100644 --- a/src/mosquito/backend.cr +++ b/src/mosquito/backend.cr @@ -1,5 +1,13 @@ module Mosquito abstract class Backend + struct BroadcastMessage + property channel : String + property message : String + + def initialize(@channel, @message) + end + end + QUEUES = %w(waiting scheduled pending dead) KEY_PREFIX = {"mosquito"} @@ -43,6 +51,8 @@ module Mosquito abstract def unlock(key : String, value : String) : Nil abstract def lock?(key : String, value : String, ttl : Time::Span) : Bool + abstract def publish(key : String, value : String) : Nil + abstract def subscribe(key : String) : Channel(BroadcastMessage) end macro inherited diff --git a/src/mosquito/base.cr b/src/mosquito/base.cr index e807167..b49ab4d 100644 --- a/src/mosquito/base.cr +++ b/src/mosquito/base.cr @@ -1,3 +1,5 @@ +require "json" + module Mosquito alias Id = Int64 | Int32 diff --git a/src/mosquito/redis_backend.cr b/src/mosquito/redis_backend.cr index fa50d78..4ac348b 100644 --- a/src/mosquito/redis_backend.cr +++ b/src/mosquito/redis_backend.cr @@ -161,6 +161,33 @@ module Mosquito remove_matching_key keys: [key], args: [value] end + def self.publish(key : String, value : String) : Nil + redis.publish key, value + end + + def self.subscribe(key : String) : Channel(Backend::BroadcastMessage) + stream = Channel(Backend::BroadcastMessage).new + + spawn do + redis.psubscribe(key) do |subscription, connection| + subscription.on_message do |channel, message| + if stream.closed? + connection.unsubscribe channel + else + stream.send( + Backend::BroadcastMessage.new( + channel: channel, + message: message + ) + ) + end + end + end + end + + stream + end + def schedule(job_run : JobRun, at scheduled_time : Time) : JobRun redis.zadd scheduled_q, scheduled_time.to_unix_ms.to_s, job_run.id job_run diff --git a/src/mosquito/test_backend.cr b/src/mosquito/test_backend.cr index 15035a2..2789a76 100644 --- a/src/mosquito/test_backend.cr +++ b/src/mosquito/test_backend.cr @@ -98,6 +98,13 @@ module Mosquito def self.unlock(key : String, value : String) : Nil end + def self.publish(key : String, value : String) : Nil + end + + def self.subscribe(key : String) : Channel(BroadcastMessage) + Channel(BroadcastMessage).new + end + struct EnqueuedJob getter id : String getter klass : Mosquito::Job.class