Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API - executors now publish events #154

Merged
merged 1 commit into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions spec/helpers/global_helpers.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module TestHelpers
backend.flush

TestingLogBackend.instance.clear
PubSub.instance.clear
yield
end
end
Expand Down
52 changes: 52 additions & 0 deletions spec/helpers/pub_sub.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
class PubSub
def self.instance
@@instance ||= new
end

def self.eavesdrop : Array(Mosquito::Backend::BroadcastMessage)
instance.receive_messages
yield
instance.stop_listening
instance.messages
end

getter messages = [] of Mosquito::Backend::BroadcastMessage
@channel = Channel(Mosquito::Backend::BroadcastMessage).new
@stopping_channel = Channel(Bool).new

def initialize
end

def receive_messages
@continue_receiving = true
spawn receive_loop
@channel = Mosquito.backend.subscribe "mosquito:*"
end

def stop_listening
@continue_receiving = false
end

def receive_loop
loop do
break unless @continue_receiving
select
when message = @channel.receive
@messages << message
when timeout(500.milliseconds)
end
end
@channel.close
end

delegate clear, to: @messages

module Helpers
delegate eavesdrop, to: PubSub
def assert_message_received(matcher : Regex) : Nil
PubSub.instance.messages.find do |message|
matcher === message.message
end
end
end
end
13 changes: 13 additions & 0 deletions spec/mosquito/api/executor_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,17 @@ describe Mosquito::Api::Executor do
# the heartbeat is stored as a unix epoch without millis
assert_equal now.at_beginning_of_second, api.heartbeat
end

it "publishes job started/finished events" do
job_run.store
job_run.build_job

eavesdrop do
observer.execute job_run, job.class.queue do
end
end

assert_message_received /job-started/
assert_message_received /job-finished/
end
end
3 changes: 3 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ Mosquito.configure do |settings|
end

require "./helpers/*"
class Minitest::Test
include PubSub::Helpers
end

Mosquito.configuration.backend.flush

Expand Down
4 changes: 4 additions & 0 deletions src/mosquito/api.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "./backend"
require "./api/observability/*"
require "./api/*"

module Mosquito::Api
Expand All @@ -24,4 +25,7 @@ module Mosquito::Api
.map { |name| Overseer.new name }
end

def self.event_receiver : Channel(Backend::BroadcastMessage)
Mosquito.backend.subscribe "mosquito:*"
end
end
10 changes: 10 additions & 0 deletions src/mosquito/api/executor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ module Mosquito

module Observability
class Executor
include Publisher

private getter log : ::Log
def self.metadata_key(instance_id : String) : String
Backend.build_key "executor", instance_id
Expand All @@ -43,12 +45,19 @@ module Mosquito
def initialize(executor : Mosquito::Runners::Executor)
@metadata = Metadata.new self.class.metadata_key executor.object_id.to_s
@log = Log.for(executor.runnable_name)
@publish_context = PublishContext.new [:executor, executor.object_id]
end

def execute(job_run : JobRun, from_queue : Queue)
@metadata["current_job"] = job_run.id
@metadata["current_job_queue"] = from_queue.name
log.info { "#{"Starting:".colorize.magenta} #{job_run} from #{from_queue.name}" }
publish({
event: "job-started",
job_run: job_run.id,
from_queue: from_queue.name,
# expected_duration_ms: expected_duration
})

duration = Time.measure do
yield
Expand All @@ -60,6 +69,7 @@ module Mosquito
log_failure_message job_run, duration
end

publish({event: "job-finished", job_run: job_run.id})
@metadata["current_job"] = nil
@metadata["current_job_queue"] = nil
end
Expand Down
27 changes: 27 additions & 0 deletions src/mosquito/api/observability/publisher.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module Mosquito::Observability::Publisher
getter publish_context : PublishContext

def publish(data : NamedTuple)
Log.debug { "Publishing #{data} to #{@publish_context.originator}" }
Mosquito.backend.publish(
publish_context.originator,
data.to_json
)
end

class PublishContext
alias Context = Array(String | Symbol | UInt64)
property originator : String
property context : String

def initialize(context : Context)
@context = KeyBuilder.build context
@originator = KeyBuilder.build "mosquito", @context
end

def initialize(parent : self, context : Context)
@context = KeyBuilder.build context
@originator = KeyBuilder.build "mosquito", parent.context, context
end
end
end
10 changes: 10 additions & 0 deletions src/mosquito/backend.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
module Mosquito
abstract class Backend
struct BroadcastMessage
property channel : String
property message : String

def initialize(@channel, @message)
end
end

QUEUES = %w(waiting scheduled pending dead)

KEY_PREFIX = {"mosquito"}
Expand Down Expand Up @@ -43,6 +51,8 @@ module Mosquito

abstract def unlock(key : String, value : String) : Nil
abstract def lock?(key : String, value : String, ttl : Time::Span) : Bool
abstract def publish(key : String, value : String) : Nil
abstract def subscribe(key : String) : Channel(BroadcastMessage)
end

macro inherited
Expand Down
2 changes: 2 additions & 0 deletions src/mosquito/base.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require "json"

module Mosquito
alias Id = Int64 | Int32

Expand Down
27 changes: 27 additions & 0 deletions src/mosquito/redis_backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,33 @@ module Mosquito
remove_matching_key keys: [key], args: [value]
end

def self.publish(key : String, value : String) : Nil
redis.publish key, value
end

def self.subscribe(key : String) : Channel(Backend::BroadcastMessage)
stream = Channel(Backend::BroadcastMessage).new

spawn do
redis.psubscribe(key) do |subscription, connection|
subscription.on_message do |channel, message|
if stream.closed?
connection.unsubscribe channel
else
stream.send(
Backend::BroadcastMessage.new(
channel: channel,
message: message
)
)
end
end
end
end

stream
end

def schedule(job_run : JobRun, at scheduled_time : Time) : JobRun
redis.zadd scheduled_q, scheduled_time.to_unix_ms.to_s, job_run.id
job_run
Expand Down
7 changes: 7 additions & 0 deletions src/mosquito/test_backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ module Mosquito
def self.unlock(key : String, value : String) : Nil
end

def self.publish(key : String, value : String) : Nil
end

def self.subscribe(key : String) : Channel(BroadcastMessage)
Channel(BroadcastMessage).new
end

struct EnqueuedJob
getter id : String
getter klass : Mosquito::Job.class
Expand Down