Skip to content

Commit

Permalink
Do not use advisory lock on heartbeat in production
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Aug 1, 2024
1 parent 0e758d7 commit 044e096
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 8 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ Available configuration options are:
- `logger` ([Rails Logger](https://api.rubyonrails.org/classes/ActiveSupport/Logger.html)) lets you set a custom logger for GoodJob. It should be an instance of a Rails `Logger` (Default: `Rails.logger`).
- `preserve_job_records` (boolean) keeps job records in your database even after jobs are completed. (Default: `true`)
- `smaller_number_is_higher_priority` (boolean) allows you to specifiy that jobs should be run in ascending order of priority (smallest priority numbers first). This will be enabled by default in the next major version of GoodJob (v4.0), but jobs with the highest priority number are run first by default in all earlier versions of GoodJob.
- `advisory_lock_heartbeat` (boolean) whether to use an advisory lock for the purpose of determining whether an execeution process is active. (Default `true` in Development; `false` in other environments)
- `retry_on_unhandled_error` (boolean) causes jobs to be re-queued and retried if they raise an instance of `StandardError`. Be advised this may lead to jobs being repeated infinitely ([see below for more on retries](#retries)). Instances of `Exception`, like SIGINT, will *always* be retried, regardless of this attribute’s value. (Default: `false`)
- `on_thread_error` (proc, lambda, or callable) will be called when there is an Exception. It can be useful for logging errors to bug tracking services, like Sentry or Airbrake. Example:
Expand Down
13 changes: 12 additions & 1 deletion app/models/good_job/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def self.cleanup
end
end

def self.create_record(id:, with_advisory_lock: false)
def self.find_or_create_record(id:, with_advisory_lock: false)
attributes = {
id: id,
state: process_state,
Expand All @@ -66,6 +66,17 @@ def self.create_record(id:, with_advisory_lock: false)
attributes[:lock_type] = :advisory
end
create!(attributes)
rescue ActiveRecord::RecordNotUnique
find_by(id: id).tap do |existing_record|
next unless existing_record

if with_advisory_lock
existing_record.advisory_lock!
existing_record.update(lock_type: :advisory, state: process_state, updated_at: Time.current)
else
existing_record.update(lock_type: nil, state: process_state, updated_at: Time.current)
end
end
end

def self.process_state
Expand Down
4 changes: 2 additions & 2 deletions lib/good_job/capsule_tracker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def id_for_lock
if @record
@record.refresh_if_stale
else
@record = GoodJob::Process.create_record(id: @record_id)
@record = GoodJob::Process.find_or_create_record(id: @record_id)
create_refresh_task
end
value = @record&.id
Expand Down Expand Up @@ -89,7 +89,7 @@ def register(with_advisory_lock: false)
@advisory_locked_connection = WeakRef.new(@record.class.connection)
end
else
@record = GoodJob::Process.create_record(id: @record_id, with_advisory_lock: true)
@record = GoodJob::Process.find_or_create_record(id: @record_id, with_advisory_lock: true)
@advisory_locked_connection = WeakRef.new(@record.class.connection)
create_refresh_task
end
Expand Down
10 changes: 10 additions & 0 deletions lib/good_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,16 @@ def in_webserver?
end || false
end

# Whether to take an advisory lock on the process record in the notifier reactor.
# @return [Boolean]
def advisory_lock_heartbeat
return options[:advisory_lock_heartbeat] unless options[:advisory_lock_heartbeat].nil?
return rails_config[:advisory_lock_heartbeat] unless rails_config[:advisory_lock_heartbeat].nil?
return ActiveModel::Type::Boolean.new.cast(env['GOOD_JOB_ADVISORY_LOCK_HEARTBEAT']) unless env['GOOD_JOB_ADVISORY_LOCK_HEARTBEAT'].nil?

Rails.env.development?
end

private

def rails_config
Expand Down
5 changes: 3 additions & 2 deletions lib/good_job/notifier/process_heartbeat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ module ProcessHeartbeat

# Registers the current process.
def register_process
@advisory_lock_heartbeat = GoodJob.configuration.advisory_lock_heartbeat
GoodJob::Process.override_connection(connection) do
GoodJob::Process.cleanup
@capsule.tracker.register(with_advisory_lock: true)
@capsule.tracker.register(with_advisory_lock: @advisory_lock_heartbeat)
end
end

Expand All @@ -33,7 +34,7 @@ def refresh_process
# Deregisters the current process.
def deregister_process
GoodJob::Process.override_connection(connection) do
@capsule.tracker.unregister(with_advisory_lock: true)
@capsule.tracker.unregister(with_advisory_lock: @advisory_lock_heartbeat)
end
end
end
Expand Down
17 changes: 17 additions & 0 deletions spec/app/models/good_job/process_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@
end
end

describe 'find_or_create_record' do
let(:record_id) { '67160140-1bec-4c3b-bc34-1a8b36f87b21' }

it 'creates a new record' do
record = described_class.find_or_create_record(id: record_id)
expect(record).to be_a(described_class)
end

it 'updates an existing record' do
record = described_class.find_or_create_record(id: record_id)
record.update!(updated_at: 1.day.ago)
updated_record = described_class.find_or_create_record(id: record_id)
expect(updated_record).to eq(record)
expect(updated_record.updated_at).to be_within(1.second).of(Time.current)
end
end

describe '#basename' do
let(:process) { described_class.new state: {} }

Expand Down
31 changes: 31 additions & 0 deletions spec/lib/good_job/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -326,4 +326,35 @@
expect(configuration.dashboard_live_poll_enabled).to eq true
end
end

describe '#advisory_lock_heartbeat' do
it 'defaults to true in development' do
allow(Rails).to receive(:env) { "development".inquiry }
configuration = described_class.new({})
expect(configuration.advisory_lock_heartbeat).to be true
end

it 'defaults to false in other environments' do
allow(Rails).to receive(:env) { "production".inquiry }
configuration = described_class.new({})
expect(configuration.advisory_lock_heartbeat).to be false
end

it 'can be overridden by options' do
configuration = described_class.new({ advisory_lock_heartbeat: true })
expect(configuration.advisory_lock_heartbeat).to be true
end

it 'can be overridden by rails config' do
allow(Rails.application.config).to receive(:good_job).and_return({ advisory_lock_heartbeat: true })
configuration = described_class.new({})
expect(configuration.advisory_lock_heartbeat).to be true
end

it 'can be overridden by environment variable' do
stub_const 'ENV', ENV.to_hash.merge({ 'GOOD_JOB_ADVISORY_LOCK_HEARTBEAT' => 'true' })
configuration = described_class.new({})
expect(configuration.advisory_lock_heartbeat).to be true
end
end
end
28 changes: 25 additions & 3 deletions spec/lib/good_job/notifier_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

expect(notifier.connected?(timeout: 5)).to be true
expect(notifier.listening?(timeout: 1)).to be false

sleep 1
notifier.shutdown

Expand All @@ -85,6 +86,7 @@
expect(notifier).to be_listening(timeout: 2)
described_class.notify(true)

wait_until { expect(GoodJob.capsule.tracker.id_for_lock).to be_present }
wait_until(max: 5) { expect(refreshes.value).to be > 0 }

notifier.shutdown
Expand Down Expand Up @@ -195,14 +197,34 @@
it 'creates and destroys a new Process record' do
notifier = described_class.new(enable_listening: true)

wait_until { expect(GoodJob::Process.count).to eq 1 }
wait_until { expect(GoodJob.capsule.tracker.locks).to eq 1 }

# Process record won't be created until the first lock is acquired when not advisory locked
id_for_lock = GoodJob.capsule.tracker.id_for_lock
process = GoodJob::Process.first
expect(process.id).to eq GoodJob.capsule.tracker.id_for_lock
expect(process).to be_advisory_locked
expect(process.id).to eq id_for_lock
expect(process).not_to be_advisory_locked

notifier.shutdown
expect { process.reload }.to raise_error ActiveRecord::RecordNotFound
end

context 'when advisory_lock_heartbeat is true' do
before do
allow(GoodJob.configuration).to receive(:advisory_lock_heartbeat).and_return(true)
end

it 'takes an advisory lock on the process record' do
notifier = described_class.new(enable_listening: true)

wait_until { expect(GoodJob::Process.count).to eq 1 }

process = GoodJob::Process.first
expect(process.id).to eq GoodJob.capsule.tracker.id_for_lock

notifier.shutdown
expect { process.reload }.to raise_error ActiveRecord::RecordNotFound
end
end
end
end

0 comments on commit 044e096

Please sign in to comment.