Skip to content

Commit

Permalink
Don't delete batches until all their callback jobs complete
Browse files Browse the repository at this point in the history
Connects to #1387

Add logic to delete batches only after their callback jobs have completed.
  • Loading branch information
bensheldon committed Aug 9, 2024
1 parent 72ba713 commit cb47fd2
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 13 deletions.
6 changes: 5 additions & 1 deletion app/models/good_job/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ class Batch
:enqueued_at,
:finished_at,
:discarded_at,
:callbacks_finished_at,
:enqueued?,
:finished?,
:succeeded?,
:discarded?,
:callbacks_finished?,
:description,
:description=,
:on_finish,
Expand Down Expand Up @@ -142,7 +144,9 @@ def retry
buffer = GoodJob::Adapter::InlineBuffer.capture do
record.transaction do
record.with_advisory_lock(function: "pg_advisory_xact_lock") do
record.update!(discarded_at: nil, finished_at: nil)
update_attributes = { discarded_at: nil, finished_at: nil }
update_attributes[:callbacks_finished_at] = nil if GoodJob::BatchRecord.callbacks_finished_at_migrated?
record.update!(update_attributes)
record.jobs.discarded.each(&:retry_job)
record._continue_discard_or_finish(lock: false)
end
Expand Down
27 changes: 23 additions & 4 deletions app/models/good_job/batch_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ class BatchRecord < BaseRecord
scope :not_discarded, -> { where(discarded_at: nil) }
scope :succeeded, -> { finished.not_discarded }

scope :finished_before, ->(timestamp) { where(arel_table['finished_at'].lteq(bind_value('finished_at', timestamp, ActiveRecord::Type::DateTime))) }
# TODO: v5 rename this `callbacks_finished_before`
scope :finished_before, lambda { |timestamp|
finished_column_name = callbacks_finished_at_migrated? ? 'callbacks_finished_at' : 'finished_at'
where(arel_table[finished_column_name].lteq(bind_value(finished_column_name, timestamp, ActiveRecord::Type::DateTime)))
}

alias_attribute :enqueued?, :enqueued_at
alias_attribute :discarded?, :discarded_at
alias_attribute :finished?, :finished_at
alias_attribute :callbacks_finished?, :callbacks_finished_at

scope :display_all, (lambda do |after_created_at: nil, after_id: nil|
query = order(created_at: :desc, id: :desc)
Expand All @@ -38,6 +43,17 @@ class BatchRecord < BaseRecord
query
end)

def self.callbacks_finished_at_migrated?
column_names.include?('callbacks_finished_at')
end

def self.indexes_migrated?
return true if connection.index_name_exists?(:good_job_batches, :index_good_job_batches_for_cleanup)

migration_pending_warning!
false
end

# Whether the batch has finished and no jobs were discarded
# @return [Boolean]
def succeeded?
Expand All @@ -52,13 +68,14 @@ def display_attributes
attributes.except('serialized_properties').merge(properties: properties)
end

def _continue_discard_or_finish(execution = nil, lock: true)
execution_discarded = execution && execution.finished_at.present? && execution.error.present?
def _continue_discard_or_finish(job = nil, lock: true)
job_discarded = job && job.finished_at.present? && job.error.present?
buffer = GoodJob::Adapter::InlineBuffer.capture do
advisory_lock_maybe(lock) do
Batch.within_thread(batch_id: nil, batch_callback_id: id) do
reload
if execution_discarded && !discarded_at

if job_discarded && !discarded_at
update(discarded_at: Time.current)
on_discard.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :discard }) if on_discard.present?
end
Expand All @@ -68,6 +85,8 @@ def _continue_discard_or_finish(execution = nil, lock: true)
on_success.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :success }) if !discarded_at && on_success.present?
on_finish.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :finish }) if on_finish.present?
end

update(callbacks_finished_at: Time.current) if finished_at && self.class.callbacks_finished_at_migrated? && callbacks_finished_at.nil? && callback_jobs.where(finished_at: nil).count.zero?
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions app/models/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Job < BaseRecord
set_callback :perform_unlocked, :after, :continue_discard_or_finish_batch

belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true
belongs_to :callback_batch, class_name: 'GoodJob::BatchRecord', foreign_key: :batch_callback_id, inverse_of: :callback_jobs, optional: true
belongs_to :locked_by_process, class_name: "GoodJob::Process", foreign_key: :locked_by_id, inverse_of: :locked_jobs, optional: true
has_many :executions, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: "id", inverse_of: :job, dependent: :delete_all

Expand Down Expand Up @@ -743,6 +744,7 @@ def reset_batch_values(&block)

def continue_discard_or_finish_batch
batch._continue_discard_or_finish(self) if batch.present?
callback_batch._continue_discard_or_finish if callback_batch.present?
end

def active_job_data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

class AddCallbacksFinishedAtToGoodJobBatches < ActiveRecord::Migration[7.1]
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_job_batches, :callbacks_finished_at)
end
end

change_table :good_job_batches do |t|
t.datetime :callbacks_finished_at
end
end
end
17 changes: 17 additions & 0 deletions demo/db/migrate/20240801143344_add_indexes_to_good_job_batches.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

class AddIndexesToGoodJobBatches < ActiveRecord::Migration[7.1]
disable_ddl_transaction!

def change
reversible do |dir|
dir.up do
return if connection.index_name_exists?(:good_job_batches, :index_good_job_batches_for_cleanup)
end
end

add_index :good_job_batches, [:created_at, :id], name: "index_good_job_batches_for_display", algorithm: :concurrently
add_index :good_job_batches, [:callbacks_finished_at, :discarded_at], order: { callbacks_finished_at: :asc, discarded_at: "ASC NULLS LAST" },
where: "(callbacks_finished_at IS NOT NULL)", name: "index_good_job_batches_for_cleanup", algorithm: :concurrently
end
end
5 changes: 4 additions & 1 deletion demo/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2024_06_13_151310) do
ActiveRecord::Schema.define(version: 2024_08_01_143344) do
# These are extensions that must be enabled in order to support this database
enable_extension "pgcrypto"
enable_extension "plpgsql"
Expand All @@ -28,6 +28,9 @@
t.datetime "enqueued_at"
t.datetime "discarded_at"
t.datetime "finished_at"
t.datetime "callbacks_finished_at"
t.index ["callbacks_finished_at", "discarded_at"], name: "index_good_job_batches_for_cleanup", where: "(callbacks_finished_at IS NOT NULL)"
t.index ["created_at", "id"], name: "index_good_job_batches_for_display"
end

create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.datetime :enqueued_at
t.datetime :discarded_at
t.datetime :finished_at
t.datetime :callbacks_finished_at
end

create_table :good_job_executions, id: :uuid do |t|
Expand Down Expand Up @@ -98,5 +99,8 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
add_index :good_jobs, :locked_by_id,
where: "locked_by_id IS NOT NULL", name: "index_good_jobs_on_locked_by_id"
add_index :good_job_executions, [:process_id, :created_at], name: :index_good_job_executions_on_process_id_and_created_at
add_index :good_job_batches, [:created_at, :id], name: "index_good_job_batches_for_display"
add_index :good_job_batches, [:callbacks_finished_at, :discarded_at], order: { callbacks_finished_at: :asc, discarded_at: "ASC NULLS LAST" },
where: "(callbacks_finished_at IS NOT NULL)", name: "index_good_job_batches_for_cleanup"
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

class AddCallbacksFinishedAtToGoodJobBatches < ActiveRecord::Migration<%= migration_version %>
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_job_batches, :callbacks_finished_at)
end
end

change_table :good_job_batches do |t|
t.datetime :callbacks_finished_at
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

class AddIndexesToGoodJobBatches < ActiveRecord::Migration<%= migration_version %>
disable_ddl_transaction!

def change
reversible do |dir|
dir.up do
return if connection.index_name_exists?(:good_job_batches, :index_good_job_batches_for_cleanup)
end
end

add_index :good_job_batches, [:created_at, :id], name: "index_good_job_batches_for_display", algorithm: :concurrently
add_index :good_job_batches, [:callbacks_finished_at, :discarded_at], order: { callbacks_finished_at: :asc, discarded_at: "ASC NULLS LAST" },
where: "(callbacks_finished_at IS NOT NULL)", name: "index_good_job_batches_for_cleanup", algorithm: :concurrently
end
end
8 changes: 4 additions & 4 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ def self.restart(timeout: -1)
_shutdown_all(Capsule.instances, :restart, timeout: timeout)
end

# Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler}, {GoodJob::MultiScheduler}, {GoodJob::CronManager})
# Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler}, {GoodJob::MultiScheduler}, {GoodJob::CronManager}, {GoodJob::SharedExecutor})
# @param executables [Array<Notifier, Poller, Scheduler, MultiScheduler, CronManager, SharedExecutor>] Objects to shut down.
# @param method_name [:symbol] Method to call, e.g. +:shutdown+ or +:restart+.
# @param timeout [nil,Numeric]
# @param method_name [Symbol] Method to call, e.g. +:shutdown+ or +:restart+.
# @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish.
# @param after [Array<Notifier, Poller, Scheduler, MultiScheduler, CronManager, SharedExecutor>] Objects to shut down after initial executables shut down.
# @return [void]
def self._shutdown_all(executables, method_name = :shutdown, timeout: -1, after: [])
Expand Down Expand Up @@ -290,7 +290,7 @@ def self.deprecator
# For use in tests/CI to validate GoodJob is up-to-date.
# @return [Boolean]
def self.migrated?
true
GoodJob::BatchRecord.indexes_migrated?
end
end

Expand Down
31 changes: 31 additions & 0 deletions spec/app/models/good_job/batch_record_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,35 @@
expect(result.last).to eq last_job
end
end

describe 'callbacks_finished_at' do
it 'is set when all callback jobs are finished' do
batch = described_class.create!
batch.update(enqueued_at: Time.current, finished_at: Time.current)
batch.callback_jobs.create!(finished_at: nil)
batch.callback_jobs.create!(finished_at: Time.current)

batch._continue_discard_or_finish

expect(batch.reload.callbacks_finished_at).to be_nil

batch.callback_jobs.update(finished_at: Time.current)
batch._continue_discard_or_finish

expect(batch.reload.callbacks_finished_at).to be_within(1.second).of(Time.current)
end
end

describe 'deletion logic' do
it 'checks callbacks_finished_at instead of finished_at' do
batch = described_class.create!
batch.update(enqueued_at: Time.current, finished_at: Time.current, callbacks_finished_at: nil)

expect { described_class.finished_before(Time.current).delete_all }.not_to change(described_class, :count)

batch.update(callbacks_finished_at: Time.current)

expect { described_class.finished_before(Time.current).delete_all }.to change(described_class, :count).by(-1)
end
end
end
8 changes: 6 additions & 2 deletions spec/app/models/good_job/batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,15 @@ def perform

batch.retry

expect(batch.reload).to be_enqueued
batch.reload
expect(batch).to have_attributes(discarded_at: nil, finished_at: nil, callbacks_finished_at: nil)
expect(batch).to be_enqueued

GoodJob.perform_inline

expect(batch.reload).to be_succeeded
batch.reload
expect(batch).to have_attributes(discarded_at: nil, finished_at: be_present, callbacks_finished_at: be_present)
expect(batch).to be_succeeded
end
end

Expand Down
17 changes: 17 additions & 0 deletions spec/integration/batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def perform(_batch, _params)
expect(batch).to be_succeeded
expect(batch.callback_active_jobs.count).to eq 1
expect(batch.callback_active_jobs.first).to be_a TestJob::SuccessCallbackJob
expect(batch.callbacks_finished_at).to be_present

job, callback_job = GoodJob::Job.order(:created_at).to_a
expect(job.status).to eq :succeeded
Expand Down Expand Up @@ -351,4 +352,20 @@ def perform(_batch, _params)
expect { batch.retry }.to change { GoodJob::Job.discarded.count }.by(-1)
end
end

describe 'batch deletion' do
it 'deletes batches only after their callback jobs have completed' do
batch = GoodJob::Batch.new
batch.on_finish = "BatchCallbackJob"
batch.enqueue do
TestJob.perform_later
end

GoodJob.perform_inline

batch.reload
expect(batch).to be_finished
expect(batch.callbacks_finished_at).to be_within(1.second).of(Time.current)
end
end
end
12 changes: 11 additions & 1 deletion spec/lib/good_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
let!(:old_finished_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago) }
let!(:old_finished_job_execution) { GoodJob::Execution.create!(active_job_id: old_finished_job.active_job_id, finished_at: 16.days.ago) }
let!(:old_discarded_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago, error: "Error") }
let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 15.days.ago) }
let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 14.days.ago, callbacks_finished_at: 15.days.ago) }

it 'deletes finished jobs' do
destroyed_records_count = described_class.cleanup_preserved_jobs(in_batches_of: 1)
Expand Down Expand Up @@ -110,6 +110,16 @@
expect { old_discarded_job.reload }.not_to raise_error
expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound
end

it "does not delete batches until their callbacks have finished" do
old_batch.update!(callbacks_finished_at: nil)
described_class.cleanup_preserved_jobs
expect { old_batch.reload }.not_to raise_error

old_batch.update!(callbacks_finished_at: 15.days.ago)
described_class.cleanup_preserved_jobs
expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound
end
end

describe '.perform_inline' do
Expand Down

0 comments on commit cb47fd2

Please sign in to comment.