Skip to content

Commit

Permalink
Refactor Adapter to reference jobs, not executions
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Jul 16, 2024
1 parent d807c21 commit 6fb7f97
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 71 deletions.
6 changes: 3 additions & 3 deletions app/models/good_job/base_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false
instrument_payload[:job] = job
job.save!

CurrentThread.execution_retried = (job if retried)
CurrentThread.retried_job = job if retried

active_job.provider_job_id = job.id
raise "These should be equal" if active_job.provider_job_id != active_job.job_id
Expand Down Expand Up @@ -463,10 +463,10 @@ def perform(lock_id:)
instrument_payload.merge!(
value: value,
handled_error: handled_error,
retried: current_thread.execution_retried.present?,
retried: current_thread.retried_job.present?,
error_event: error_event
)
ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried: current_thread.execution_retried)
ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried_job: current_thread.retried_job)
rescue StandardError => e
error_event = if e.is_a?(GoodJob::InterruptError)
:interrupted
Expand Down
12 changes: 6 additions & 6 deletions app/models/good_job/execution_result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ class ExecutionResult
attr_reader :error_event
# @return [Boolean, nil]
attr_reader :unexecutable
# @return [GoodJob::Execution, nil]
attr_reader :retried
# @return [GoodJob::Job, nil]
attr_reader :retried_job

# @param value [Object, nil]
# @param handled_error [Exception, nil]
# @param unhandled_error [Exception, nil]
# @param error_event [String, nil]
# @param unexecutable [Boolean, nil]
# @param retried [Boolean, nil]
def initialize(value:, handled_error: nil, unhandled_error: nil, error_event: nil, unexecutable: nil, retried: nil)
# @param retried_job [GoodJob::Job, nil]
def initialize(value:, handled_error: nil, unhandled_error: nil, error_event: nil, unexecutable: nil, retried_job: nil)
@value = value
@handled_error = handled_error
@unhandled_error = unhandled_error
@error_event = error_event
@unexecutable = unexecutable
@retried = retried
@retried_job = retried_job
end

# @return [Boolean]
Expand All @@ -38,7 +38,7 @@ def succeeded?

# @return [Boolean]
def retried?
retried.present?
retried_job.present?
end
end
end
106 changes: 50 additions & 56 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,70 +57,55 @@ def enqueue_all(active_jobs)

Rails.application.executor.wrap do
current_time = Time.current
executions = active_jobs.map do |active_job|
jobs = active_jobs.map do |active_job|
GoodJob::Job.build_for_enqueue(active_job).tap do |job|
job.scheduled_at = current_time if job.scheduled_at == job.created_at
job.created_at = current_time
job.updated_at = current_time
end
end

inline_executions = []
inline_jobs = []
GoodJob::Job.transaction(requires_new: true, joinable: false) do
execution_attributes = executions.map(&:attributes)
results = GoodJob::Job.insert_all(execution_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations
job_attributes = jobs.map(&:attributes)
results = GoodJob::Job.insert_all(job_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations

job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] }
active_jobs.each do |active_job|
active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id]
active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=)
end
executions.each do |execution|
execution.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[execution.active_job_id]
jobs.each do |job|
job.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[job.active_job_id]
end
executions = executions.select(&:persisted?) # prune unpersisted executions
jobs = jobs.select(&:persisted?) # prune unpersisted jobs

if execute_inline?
inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) }
inline_executions.each(&:advisory_lock!)
inline_jobs = jobs.select { |job| (job.scheduled_at.nil? || job.scheduled_at <= Time.current) }
inline_jobs.each(&:advisory_lock!)
end
end

@capsule.tracker.register
begin
until inline_executions.empty?
begin
inline_execution = inline_executions.shift
inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock)

retried_execution = inline_result.retried
while retried_execution && retried_execution.scheduled_at <= Time.current
inline_execution = retried_execution
inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock)
retried_execution = inline_result.retried
end
ensure
inline_execution.advisory_unlock
inline_execution.run_callbacks(:perform_unlocked)
if inline_jobs.any?
@capsule.tracker.register do
until inline_jobs.empty?
inline_job = inline_jobs.shift
perform_inline(inline_job, notify: false)
end
raise inline_result.unhandled_error if inline_result.unhandled_error
end
ensure
@capsule.tracker.unregister
inline_executions.each(&:advisory_unlock)
end

non_inline_executions = executions.reject(&:finished_at)
if non_inline_executions.any?
non_inline_jobs = jobs.reject(&:finished_at)
if non_inline_jobs.any?
job_id_to_active_jobs = active_jobs.index_by(&:job_id)
non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue|
executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at|
state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size }
non_inline_jobs.group_by(&:queue_name).each do |queue_name, jobs_by_queue|
jobs_by_queue.group_by(&:scheduled_at).each do |scheduled_at, jobs_by_queue_and_scheduled_at|
state = { queue_name: queue_name, count: jobs_by_queue_and_scheduled_at.size }
state[:scheduled_at] = scheduled_at if scheduled_at

executed_locally = execute_async? && @capsule&.create_thread(state)
unless executed_locally
state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
state[:count] = job_id_to_active_jobs.values_at(*jobs_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
Notifier.notify(state) unless state[:count].zero?
end
end
Expand Down Expand Up @@ -148,43 +133,30 @@ def enqueue_at(active_job, timestamp)
will_retry_inline = will_execute_inline && CurrentThread.job&.active_job_id == active_job.job_id && !CurrentThread.retry_now

if will_retry_inline
execution = GoodJob::Job.enqueue(
job = GoodJob::Job.enqueue(
active_job,
scheduled_at: scheduled_at
)
elsif will_execute_inline
execution = GoodJob::Job.enqueue(
job = GoodJob::Job.enqueue(
active_job,
scheduled_at: scheduled_at,
create_with_advisory_lock: true
)
begin
result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) }

retried_execution = result.retried
while retried_execution && (retried_execution.scheduled_at.nil? || retried_execution.scheduled_at <= Time.current)
execution = retried_execution
result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) }
retried_execution = result.retried
end

Notifier.notify(retried_execution.job_state) if retried_execution&.scheduled_at && retried_execution.scheduled_at > Time.current && send_notify?(active_job)
ensure
execution.advisory_unlock
execution.run_callbacks(:perform_unlocked)
@capsule.tracker.register do
perform_inline(job, notify: send_notify?(active_job))
end
raise result.unhandled_error if result.unhandled_error
else
execution = GoodJob::Job.enqueue(
job = GoodJob::Job.enqueue(
active_job,
scheduled_at: scheduled_at
)

executed_locally = execute_async? && @capsule&.create_thread(execution.job_state)
Notifier.notify(execution.job_state) if !executed_locally && send_notify?(active_job)
executed_locally = execute_async? && @capsule&.create_thread(job.job_state)
Notifier.notify(job.job_state) if !executed_locally && send_notify?(active_job)
end

execution
job
end
end

Expand Down Expand Up @@ -250,5 +222,27 @@ def send_notify?(active_job)

!(active_job.good_job_notify == false || (active_job.class.good_job_notify == false && active_job.good_job_notify.nil?))
end

# @param job [GoodJob::Job] the job to perform, which must be enqueued and advisory locked already
# @param notify [Boolean] whether to send a NOTIFY event for a retried job
def perform_inline(job, notify: true)
result = nil
retried_job = nil

loop do
result = job.perform(lock_id: @capsule.tracker.id_for_lock)
retried_job = result.retried_job
break if retried_job.nil? || retried_job.scheduled_at.nil? || retried_job.scheduled_at > Time.current

job = retried_job
end

Notifier.notify(retried_job.job_state) if notify && retried_job&.scheduled_at && retried_job.scheduled_at > Time.current
ensure
job.advisory_unlock
job.run_callbacks(:perform_unlocked)

raise result.unhandled_error if result.unhandled_error
end
end
end
8 changes: 4 additions & 4 deletions lib/good_job/current_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module CurrentThread
error_on_retry_stopped
job
execution_interrupted
execution_retried
retried_job
retry_now
].freeze

Expand Down Expand Up @@ -61,11 +61,11 @@ module CurrentThread
# @return [Boolean, nil]
thread_mattr_accessor :execution_interrupted

# @!attribute [rw] execution_retried
# @!attribute [rw] retried_job
# @!scope class
# Execution Retried
# @return [Boolean, nil]
thread_mattr_accessor :execution_retried
# @return [GoodJob::Job, nil]
thread_mattr_accessor :retried_job

# @!attribute [rw] retry_now
# @!scope class
Expand Down
4 changes: 2 additions & 2 deletions spec/lib/good_job/current_thread_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
:error_on_retry,
:error_on_retry_stopped,
:execution_interrupted,
:execution_retried,
:retried_job,
:job,
].each do |accessor|
describe ".#{accessor}" do
Expand Down Expand Up @@ -63,7 +63,7 @@
error_on_retry_stopped: nil,
job: instance_double(GoodJob::Job),
execution_interrupted: nil,
execution_retried: nil,
retried_job: nil,
retry_now: nil,
}

Expand Down

0 comments on commit 6fb7f97

Please sign in to comment.