From 6fb7f9793a43c19392112663e88e813d384d28e4 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Mon, 15 Jul 2024 21:53:06 -0700 Subject: [PATCH] Refactor Adapter to reference jobs, not executions --- app/models/good_job/base_execution.rb | 6 +- app/models/good_job/execution_result.rb | 12 +-- lib/good_job/adapter.rb | 106 +++++++++++------------ lib/good_job/current_thread.rb | 8 +- spec/lib/good_job/current_thread_spec.rb | 4 +- 5 files changed, 65 insertions(+), 71 deletions(-) diff --git a/app/models/good_job/base_execution.rb b/app/models/good_job/base_execution.rb index 6ce54eac3..f387d56db 100644 --- a/app/models/good_job/base_execution.rb +++ b/app/models/good_job/base_execution.rb @@ -372,7 +372,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false instrument_payload[:job] = job job.save! - CurrentThread.execution_retried = (job if retried) + CurrentThread.retried_job = job if retried active_job.provider_job_id = job.id raise "These should be equal" if active_job.provider_job_id != active_job.job_id @@ -463,10 +463,10 @@ def perform(lock_id:) instrument_payload.merge!( value: value, handled_error: handled_error, - retried: current_thread.execution_retried.present?, + retried: current_thread.retried_job.present?, error_event: error_event ) - ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried: current_thread.execution_retried) + ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried_job: current_thread.retried_job) rescue StandardError => e error_event = if e.is_a?(GoodJob::InterruptError) :interrupted diff --git a/app/models/good_job/execution_result.rb b/app/models/good_job/execution_result.rb index 2a918642a..9d9b83bca 100644 --- a/app/models/good_job/execution_result.rb +++ b/app/models/good_job/execution_result.rb @@ -13,22 +13,22 @@ class ExecutionResult attr_reader :error_event # @return [Boolean, nil] attr_reader :unexecutable - # @return [GoodJob::Execution, nil] - attr_reader :retried + # @return [GoodJob::Job, nil] + attr_reader :retried_job # @param value [Object, nil] # @param handled_error [Exception, nil] # @param unhandled_error [Exception, nil] # @param error_event [String, nil] # @param unexecutable [Boolean, nil] - # @param retried [Boolean, nil] - def initialize(value:, handled_error: nil, unhandled_error: nil, error_event: nil, unexecutable: nil, retried: nil) + # @param retried_job [GoodJob::Job, nil] + def initialize(value:, handled_error: nil, unhandled_error: nil, error_event: nil, unexecutable: nil, retried_job: nil) @value = value @handled_error = handled_error @unhandled_error = unhandled_error @error_event = error_event @unexecutable = unexecutable - @retried = retried + @retried_job = retried_job end # @return [Boolean] @@ -38,7 +38,7 @@ def succeeded? # @return [Boolean] def retried? - retried.present? + retried_job.present? end end end diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index dae9276f9..4b3aff44f 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -57,7 +57,7 @@ def enqueue_all(active_jobs) Rails.application.executor.wrap do current_time = Time.current - executions = active_jobs.map do |active_job| + jobs = active_jobs.map do |active_job| GoodJob::Job.build_for_enqueue(active_job).tap do |job| job.scheduled_at = current_time if job.scheduled_at == job.created_at job.created_at = current_time @@ -65,62 +65,47 @@ def enqueue_all(active_jobs) end end - inline_executions = [] + inline_jobs = [] GoodJob::Job.transaction(requires_new: true, joinable: false) do - execution_attributes = executions.map(&:attributes) - results = GoodJob::Job.insert_all(execution_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations + job_attributes = jobs.map(&:attributes) + results = GoodJob::Job.insert_all(job_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] } active_jobs.each do |active_job| active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id] active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=) end - executions.each do |execution| - execution.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[execution.active_job_id] + jobs.each do |job| + job.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[job.active_job_id] end - executions = executions.select(&:persisted?) # prune unpersisted executions + jobs = jobs.select(&:persisted?) # prune unpersisted jobs if execute_inline? - inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) } - inline_executions.each(&:advisory_lock!) + inline_jobs = jobs.select { |job| (job.scheduled_at.nil? || job.scheduled_at <= Time.current) } + inline_jobs.each(&:advisory_lock!) end end - @capsule.tracker.register - begin - until inline_executions.empty? - begin - inline_execution = inline_executions.shift - inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock) - - retried_execution = inline_result.retried - while retried_execution && retried_execution.scheduled_at <= Time.current - inline_execution = retried_execution - inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock) - retried_execution = inline_result.retried - end - ensure - inline_execution.advisory_unlock - inline_execution.run_callbacks(:perform_unlocked) + if inline_jobs.any? + @capsule.tracker.register do + until inline_jobs.empty? + inline_job = inline_jobs.shift + perform_inline(inline_job, notify: false) end - raise inline_result.unhandled_error if inline_result.unhandled_error end - ensure - @capsule.tracker.unregister - inline_executions.each(&:advisory_unlock) end - non_inline_executions = executions.reject(&:finished_at) - if non_inline_executions.any? + non_inline_jobs = jobs.reject(&:finished_at) + if non_inline_jobs.any? job_id_to_active_jobs = active_jobs.index_by(&:job_id) - non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue| - executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at| - state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size } + non_inline_jobs.group_by(&:queue_name).each do |queue_name, jobs_by_queue| + jobs_by_queue.group_by(&:scheduled_at).each do |scheduled_at, jobs_by_queue_and_scheduled_at| + state = { queue_name: queue_name, count: jobs_by_queue_and_scheduled_at.size } state[:scheduled_at] = scheduled_at if scheduled_at executed_locally = execute_async? && @capsule&.create_thread(state) unless executed_locally - state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) } + state[:count] = job_id_to_active_jobs.values_at(*jobs_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) } Notifier.notify(state) unless state[:count].zero? end end @@ -148,43 +133,30 @@ def enqueue_at(active_job, timestamp) will_retry_inline = will_execute_inline && CurrentThread.job&.active_job_id == active_job.job_id && !CurrentThread.retry_now if will_retry_inline - execution = GoodJob::Job.enqueue( + job = GoodJob::Job.enqueue( active_job, scheduled_at: scheduled_at ) elsif will_execute_inline - execution = GoodJob::Job.enqueue( + job = GoodJob::Job.enqueue( active_job, scheduled_at: scheduled_at, create_with_advisory_lock: true ) - begin - result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) } - - retried_execution = result.retried - while retried_execution && (retried_execution.scheduled_at.nil? || retried_execution.scheduled_at <= Time.current) - execution = retried_execution - result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) } - retried_execution = result.retried - end - - Notifier.notify(retried_execution.job_state) if retried_execution&.scheduled_at && retried_execution.scheduled_at > Time.current && send_notify?(active_job) - ensure - execution.advisory_unlock - execution.run_callbacks(:perform_unlocked) + @capsule.tracker.register do + perform_inline(job, notify: send_notify?(active_job)) end - raise result.unhandled_error if result.unhandled_error else - execution = GoodJob::Job.enqueue( + job = GoodJob::Job.enqueue( active_job, scheduled_at: scheduled_at ) - executed_locally = execute_async? && @capsule&.create_thread(execution.job_state) - Notifier.notify(execution.job_state) if !executed_locally && send_notify?(active_job) + executed_locally = execute_async? && @capsule&.create_thread(job.job_state) + Notifier.notify(job.job_state) if !executed_locally && send_notify?(active_job) end - execution + job end end @@ -250,5 +222,27 @@ def send_notify?(active_job) !(active_job.good_job_notify == false || (active_job.class.good_job_notify == false && active_job.good_job_notify.nil?)) end + + # @param job [GoodJob::Job] the job to perform, which must be enqueued and advisory locked already + # @param notify [Boolean] whether to send a NOTIFY event for a retried job + def perform_inline(job, notify: true) + result = nil + retried_job = nil + + loop do + result = job.perform(lock_id: @capsule.tracker.id_for_lock) + retried_job = result.retried_job + break if retried_job.nil? || retried_job.scheduled_at.nil? || retried_job.scheduled_at > Time.current + + job = retried_job + end + + Notifier.notify(retried_job.job_state) if notify && retried_job&.scheduled_at && retried_job.scheduled_at > Time.current + ensure + job.advisory_unlock + job.run_callbacks(:perform_unlocked) + + raise result.unhandled_error if result.unhandled_error + end end end diff --git a/lib/good_job/current_thread.rb b/lib/good_job/current_thread.rb index eb1cb3db1..5588aef0d 100644 --- a/lib/good_job/current_thread.rb +++ b/lib/good_job/current_thread.rb @@ -15,7 +15,7 @@ module CurrentThread error_on_retry_stopped job execution_interrupted - execution_retried + retried_job retry_now ].freeze @@ -61,11 +61,11 @@ module CurrentThread # @return [Boolean, nil] thread_mattr_accessor :execution_interrupted - # @!attribute [rw] execution_retried + # @!attribute [rw] retried_job # @!scope class # Execution Retried - # @return [Boolean, nil] - thread_mattr_accessor :execution_retried + # @return [GoodJob::Job, nil] + thread_mattr_accessor :retried_job # @!attribute [rw] retry_now # @!scope class diff --git a/spec/lib/good_job/current_thread_spec.rb b/spec/lib/good_job/current_thread_spec.rb index d336f20b8..ea2a4fd0a 100644 --- a/spec/lib/good_job/current_thread_spec.rb +++ b/spec/lib/good_job/current_thread_spec.rb @@ -12,7 +12,7 @@ :error_on_retry, :error_on_retry_stopped, :execution_interrupted, - :execution_retried, + :retried_job, :job, ].each do |accessor| describe ".#{accessor}" do @@ -63,7 +63,7 @@ error_on_retry_stopped: nil, job: instance_double(GoodJob::Job), execution_interrupted: nil, - execution_retried: nil, + retried_job: nil, retry_now: nil, }