diff --git a/app/models/good_job/batch.rb b/app/models/good_job/batch.rb index ba2dcc2d..249272f9 100644 --- a/app/models/good_job/batch.rb +++ b/app/models/good_job/batch.rb @@ -26,10 +26,12 @@ class Batch :enqueued_at, :finished_at, :discarded_at, + :callbacks_finished_at, :enqueued?, :finished?, :succeeded?, :discarded?, + :callbacks_finished?, :description, :description=, :on_finish, @@ -142,7 +144,9 @@ def retry buffer = GoodJob::Adapter::InlineBuffer.capture do record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do - record.update!(discarded_at: nil, finished_at: nil) + update_attributes = { discarded_at: nil, finished_at: nil } + update_attributes[:callbacks_finished_at] = nil if GoodJob::BatchRecord.callbacks_finished_at_migrated? + record.update!(update_attributes) record.jobs.discarded.each(&:retry_job) record._continue_discard_or_finish(lock: false) end diff --git a/app/models/good_job/batch_record.rb b/app/models/good_job/batch_record.rb index bf162d58..da33303f 100644 --- a/app/models/good_job/batch_record.rb +++ b/app/models/good_job/batch_record.rb @@ -18,11 +18,16 @@ class BatchRecord < BaseRecord scope :not_discarded, -> { where(discarded_at: nil) } scope :succeeded, -> { finished.not_discarded } - scope :finished_before, ->(timestamp) { where(arel_table['finished_at'].lteq(bind_value('finished_at', timestamp, ActiveRecord::Type::DateTime))) } + # TODO: v5 rename this `callbacks_finished_before` + scope :finished_before, lambda { |timestamp| + finished_column_name = callbacks_finished_at_migrated? ? 'callbacks_finished_at' : 'finished_at' + where(arel_table[finished_column_name].lteq(bind_value(finished_column_name, timestamp, ActiveRecord::Type::DateTime))) + } alias_attribute :enqueued?, :enqueued_at alias_attribute :discarded?, :discarded_at alias_attribute :finished?, :finished_at + alias_attribute :callbacks_finished?, :callbacks_finished_at scope :display_all, (lambda do |after_created_at: nil, after_id: nil| query = order(created_at: :desc, id: :desc) @@ -38,6 +43,17 @@ class BatchRecord < BaseRecord query end) + def self.callbacks_finished_at_migrated? + column_names.include?('callbacks_finished_at') + end + + def self.indexes_migrated? + return true if connection.index_name_exists?(:good_job_batches, :index_good_job_batches_for_cleanup) + + migration_pending_warning! + false + end + # Whether the batch has finished and no jobs were discarded # @return [Boolean] def succeeded? @@ -52,13 +68,14 @@ def display_attributes attributes.except('serialized_properties').merge(properties: properties) end - def _continue_discard_or_finish(execution = nil, lock: true) - execution_discarded = execution && execution.finished_at.present? && execution.error.present? + def _continue_discard_or_finish(job = nil, lock: true) + job_discarded = job && job.finished_at.present? && job.error.present? buffer = GoodJob::Adapter::InlineBuffer.capture do advisory_lock_maybe(lock) do Batch.within_thread(batch_id: nil, batch_callback_id: id) do reload - if execution_discarded && !discarded_at + + if job_discarded && !discarded_at update(discarded_at: Time.current) on_discard.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :discard }) if on_discard.present? end @@ -68,6 +85,8 @@ def _continue_discard_or_finish(execution = nil, lock: true) on_success.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :success }) if !discarded_at && on_success.present? on_finish.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :finish }) if on_finish.present? end + + update(callbacks_finished_at: Time.current) if finished_at && self.class.callbacks_finished_at_migrated? && callbacks_finished_at.nil? && callback_jobs.where(finished_at: nil).count.zero? end end end diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb index 0aa4f9d9..8bbfebb8 100644 --- a/app/models/good_job/job.rb +++ b/app/models/good_job/job.rb @@ -40,6 +40,7 @@ class Job < BaseRecord set_callback :perform_unlocked, :after, :continue_discard_or_finish_batch belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true + belongs_to :callback_batch, class_name: 'GoodJob::BatchRecord', foreign_key: :batch_callback_id, inverse_of: :callback_jobs, optional: true belongs_to :locked_by_process, class_name: "GoodJob::Process", foreign_key: :locked_by_id, inverse_of: :locked_jobs, optional: true has_many :executions, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: "id", inverse_of: :job, dependent: :delete_all @@ -743,6 +744,7 @@ def reset_batch_values(&block) def continue_discard_or_finish_batch batch._continue_discard_or_finish(self) if batch.present? + callback_batch._continue_discard_or_finish if callback_batch.present? end def active_job_data diff --git a/demo/db/migrate/20240801143343_add_callbacks_finished_at_to_good_job_batches.rb b/demo/db/migrate/20240801143343_add_callbacks_finished_at_to_good_job_batches.rb new file mode 100644 index 00000000..c630a97c --- /dev/null +++ b/demo/db/migrate/20240801143343_add_callbacks_finished_at_to_good_job_batches.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class AddCallbacksFinishedAtToGoodJobBatches < ActiveRecord::Migration[7.1] + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_job_batches, :callbacks_finished_at) + end + end + + change_table :good_job_batches do |t| + t.datetime :callbacks_finished_at + end + end +end diff --git a/demo/db/migrate/20240801143344_add_indexes_to_good_job_batches.rb b/demo/db/migrate/20240801143344_add_indexes_to_good_job_batches.rb new file mode 100644 index 00000000..23f43b10 --- /dev/null +++ b/demo/db/migrate/20240801143344_add_indexes_to_good_job_batches.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class AddIndexesToGoodJobBatches < ActiveRecord::Migration[7.1] + disable_ddl_transaction! + + def change + reversible do |dir| + dir.up do + return if connection.index_name_exists?(:good_job_batches, :index_good_job_batches_for_cleanup) + end + end + + add_index :good_job_batches, [:created_at, :id], name: "index_good_job_batches_for_display", algorithm: :concurrently + add_index :good_job_batches, [:callbacks_finished_at, :discarded_at], order: { callbacks_finished_at: :asc, discarded_at: "ASC NULLS LAST" }, + where: "(callbacks_finished_at IS NOT NULL)", name: "index_good_job_batches_for_cleanup", algorithm: :concurrently + end +end diff --git a/demo/db/schema.rb b/demo/db/schema.rb index d8bc826f..b65cf86a 100644 --- a/demo/db/schema.rb +++ b/demo/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2024_06_13_151310) do +ActiveRecord::Schema.define(version: 2024_08_01_143344) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" @@ -28,6 +28,9 @@ t.datetime "enqueued_at" t.datetime "discarded_at" t.datetime "finished_at" + t.datetime "callbacks_finished_at" + t.index ["callbacks_finished_at", "discarded_at"], name: "index_good_job_batches_for_cleanup", where: "(callbacks_finished_at IS NOT NULL)" + t.index ["created_at", "id"], name: "index_good_job_batches_for_display" end create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index ab5da0a0..22f3fc2d 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -46,6 +46,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.datetime :enqueued_at t.datetime :discarded_at t.datetime :finished_at + t.datetime :callbacks_finished_at end create_table :good_job_executions, id: :uuid do |t| @@ -98,5 +99,8 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> add_index :good_jobs, :locked_by_id, where: "locked_by_id IS NOT NULL", name: "index_good_jobs_on_locked_by_id" add_index :good_job_executions, [:process_id, :created_at], name: :index_good_job_executions_on_process_id_and_created_at + add_index :good_job_batches, [:created_at, :id], name: "index_good_job_batches_for_display" + add_index :good_job_batches, [:callbacks_finished_at, :discarded_at], order: { callbacks_finished_at: :asc, discarded_at: "ASC NULLS LAST" }, + where: "(callbacks_finished_at IS NOT NULL)", name: "index_good_job_batches_for_cleanup" end end diff --git a/lib/generators/good_job/templates/update/migrations/02_add_callbacks_finished_at_to_good_job_batches.rb.erb b/lib/generators/good_job/templates/update/migrations/02_add_callbacks_finished_at_to_good_job_batches.rb.erb new file mode 100644 index 00000000..bac68f7b --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/02_add_callbacks_finished_at_to_good_job_batches.rb.erb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class AddCallbacksFinishedAtToGoodJobBatches < ActiveRecord::Migration<%= migration_version %> + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_job_batches, :callbacks_finished_at) + end + end + + change_table :good_job_batches do |t| + t.datetime :callbacks_finished_at + end + end +end diff --git a/lib/generators/good_job/templates/update/migrations/03_add_indexes_to_good_job_batches.rb.erb b/lib/generators/good_job/templates/update/migrations/03_add_indexes_to_good_job_batches.rb.erb new file mode 100644 index 00000000..c8b7d728 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/03_add_indexes_to_good_job_batches.rb.erb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class AddIndexesToGoodJobBatches < ActiveRecord::Migration<%= migration_version %> + disable_ddl_transaction! + + def change + reversible do |dir| + dir.up do + return if connection.index_name_exists?(:good_job_batches, :index_good_job_batches_for_cleanup) + end + end + + add_index :good_job_batches, [:created_at, :id], name: "index_good_job_batches_for_display", algorithm: :concurrently + add_index :good_job_batches, [:callbacks_finished_at, :discarded_at], order: { callbacks_finished_at: :asc, discarded_at: "ASC NULLS LAST" }, + where: "(callbacks_finished_at IS NOT NULL)", name: "index_good_job_batches_for_cleanup", algorithm: :concurrently + end +end diff --git a/lib/good_job.rb b/lib/good_job.rb index 72728d92..86a509fc 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -171,10 +171,10 @@ def self.restart(timeout: -1) _shutdown_all(Capsule.instances, :restart, timeout: timeout) end - # Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler}, {GoodJob::MultiScheduler}, {GoodJob::CronManager}) + # Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler}, {GoodJob::MultiScheduler}, {GoodJob::CronManager}, {GoodJob::SharedExecutor}) # @param executables [Array] Objects to shut down. - # @param method_name [:symbol] Method to call, e.g. +:shutdown+ or +:restart+. - # @param timeout [nil,Numeric] + # @param method_name [Symbol] Method to call, e.g. +:shutdown+ or +:restart+. + # @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish. # @param after [Array] Objects to shut down after initial executables shut down. # @return [void] def self._shutdown_all(executables, method_name = :shutdown, timeout: -1, after: []) @@ -290,7 +290,7 @@ def self.deprecator # For use in tests/CI to validate GoodJob is up-to-date. # @return [Boolean] def self.migrated? - true + GoodJob::BatchRecord.indexes_migrated? end end diff --git a/spec/app/models/good_job/batch_record_spec.rb b/spec/app/models/good_job/batch_record_spec.rb index a2d00249..9cebe60d 100644 --- a/spec/app/models/good_job/batch_record_spec.rb +++ b/spec/app/models/good_job/batch_record_spec.rb @@ -42,4 +42,35 @@ expect(result.last).to eq last_job end end + + describe 'callbacks_finished_at' do + it 'is set when all callback jobs are finished' do + batch = described_class.create! + batch.update(enqueued_at: Time.current, finished_at: Time.current) + batch.callback_jobs.create!(finished_at: nil) + batch.callback_jobs.create!(finished_at: Time.current) + + batch._continue_discard_or_finish + + expect(batch.reload.callbacks_finished_at).to be_nil + + batch.callback_jobs.update(finished_at: Time.current) + batch._continue_discard_or_finish + + expect(batch.reload.callbacks_finished_at).to be_within(1.second).of(Time.current) + end + end + + describe 'deletion logic' do + it 'checks callbacks_finished_at instead of finished_at' do + batch = described_class.create! + batch.update(enqueued_at: Time.current, finished_at: Time.current, callbacks_finished_at: nil) + + expect { described_class.finished_before(Time.current).delete_all }.not_to change(described_class, :count) + + batch.update(callbacks_finished_at: Time.current) + + expect { described_class.finished_before(Time.current).delete_all }.to change(described_class, :count).by(-1) + end + end end diff --git a/spec/app/models/good_job/batch_spec.rb b/spec/app/models/good_job/batch_spec.rb index 3f76108f..fe1f83e2 100644 --- a/spec/app/models/good_job/batch_spec.rb +++ b/spec/app/models/good_job/batch_spec.rb @@ -117,11 +117,15 @@ def perform batch.retry - expect(batch.reload).to be_enqueued + batch.reload + expect(batch).to have_attributes(discarded_at: nil, finished_at: nil, callbacks_finished_at: nil) + expect(batch).to be_enqueued GoodJob.perform_inline - expect(batch.reload).to be_succeeded + batch.reload + expect(batch).to have_attributes(discarded_at: nil, finished_at: be_present, callbacks_finished_at: be_present) + expect(batch).to be_succeeded end end diff --git a/spec/integration/batch_spec.rb b/spec/integration/batch_spec.rb index 91f19456..faeefc15 100644 --- a/spec/integration/batch_spec.rb +++ b/spec/integration/batch_spec.rb @@ -309,6 +309,7 @@ def perform(_batch, _params) expect(batch).to be_succeeded expect(batch.callback_active_jobs.count).to eq 1 expect(batch.callback_active_jobs.first).to be_a TestJob::SuccessCallbackJob + expect(batch.callbacks_finished_at).to be_present job, callback_job = GoodJob::Job.order(:created_at).to_a expect(job.status).to eq :succeeded @@ -351,4 +352,20 @@ def perform(_batch, _params) expect { batch.retry }.to change { GoodJob::Job.discarded.count }.by(-1) end end + + describe 'batch deletion' do + it 'deletes batches only after their callback jobs have completed' do + batch = GoodJob::Batch.new + batch.on_finish = "BatchCallbackJob" + batch.enqueue do + TestJob.perform_later + end + + GoodJob.perform_inline + + batch.reload + expect(batch).to be_finished + expect(batch.callbacks_finished_at).to be_within(1.second).of(Time.current) + end + end end diff --git a/spec/lib/good_job_spec.rb b/spec/lib/good_job_spec.rb index 3b1f83f8..d8e2ad2a 100644 --- a/spec/lib/good_job_spec.rb +++ b/spec/lib/good_job_spec.rb @@ -58,7 +58,7 @@ let!(:old_finished_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago) } let!(:old_finished_job_execution) { GoodJob::Execution.create!(active_job_id: old_finished_job.active_job_id, finished_at: 16.days.ago) } let!(:old_discarded_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago, error: "Error") } - let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 15.days.ago) } + let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 14.days.ago, callbacks_finished_at: 15.days.ago) } it 'deletes finished jobs' do destroyed_records_count = described_class.cleanup_preserved_jobs(in_batches_of: 1) @@ -110,6 +110,16 @@ expect { old_discarded_job.reload }.not_to raise_error expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end + + it "does not delete batches until their callbacks have finished" do + old_batch.update!(callbacks_finished_at: nil) + described_class.cleanup_preserved_jobs + expect { old_batch.reload }.not_to raise_error + + old_batch.update!(callbacks_finished_at: 15.days.ago) + described_class.cleanup_preserved_jobs + expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound + end end describe '.perform_inline' do