From 044e096a5b6e7cc0878591fd54de48ef204de413 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Wed, 31 Jul 2024 16:36:47 -0700 Subject: [PATCH] Do not use advisory lock on heartbeat in production --- README.md | 1 + app/models/good_job/process.rb | 13 ++++++++- lib/good_job/capsule_tracker.rb | 4 +-- lib/good_job/configuration.rb | 10 +++++++ lib/good_job/notifier/process_heartbeat.rb | 5 ++-- spec/app/models/good_job/process_spec.rb | 17 ++++++++++++ spec/lib/good_job/configuration_spec.rb | 31 ++++++++++++++++++++++ spec/lib/good_job/notifier_spec.rb | 28 ++++++++++++++++--- 8 files changed, 101 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 32ef415e7..857525f16 100644 --- a/README.md +++ b/README.md @@ -308,6 +308,7 @@ Available configuration options are: - `logger` ([Rails Logger](https://api.rubyonrails.org/classes/ActiveSupport/Logger.html)) lets you set a custom logger for GoodJob. It should be an instance of a Rails `Logger` (Default: `Rails.logger`). - `preserve_job_records` (boolean) keeps job records in your database even after jobs are completed. (Default: `true`) - `smaller_number_is_higher_priority` (boolean) allows you to specifiy that jobs should be run in ascending order of priority (smallest priority numbers first). This will be enabled by default in the next major version of GoodJob (v4.0), but jobs with the highest priority number are run first by default in all earlier versions of GoodJob. +- `advisory_lock_heartbeat` (boolean) whether to use an advisory lock for the purpose of determining whether an execeution process is active. (Default `true` in Development; `false` in other environments) - `retry_on_unhandled_error` (boolean) causes jobs to be re-queued and retried if they raise an instance of `StandardError`. Be advised this may lead to jobs being repeated infinitely ([see below for more on retries](#retries)). Instances of `Exception`, like SIGINT, will *always* be retried, regardless of this attribute’s value. (Default: `false`) - `on_thread_error` (proc, lambda, or callable) will be called when there is an Exception. It can be useful for logging errors to bug tracking services, like Sentry or Airbrake. Example: diff --git a/app/models/good_job/process.rb b/app/models/good_job/process.rb index f52ea0b33..70eb0f6cd 100644 --- a/app/models/good_job/process.rb +++ b/app/models/good_job/process.rb @@ -56,7 +56,7 @@ def self.cleanup end end - def self.create_record(id:, with_advisory_lock: false) + def self.find_or_create_record(id:, with_advisory_lock: false) attributes = { id: id, state: process_state, @@ -66,6 +66,17 @@ def self.create_record(id:, with_advisory_lock: false) attributes[:lock_type] = :advisory end create!(attributes) + rescue ActiveRecord::RecordNotUnique + find_by(id: id).tap do |existing_record| + next unless existing_record + + if with_advisory_lock + existing_record.advisory_lock! + existing_record.update(lock_type: :advisory, state: process_state, updated_at: Time.current) + else + existing_record.update(lock_type: nil, state: process_state, updated_at: Time.current) + end + end end def self.process_state diff --git a/lib/good_job/capsule_tracker.rb b/lib/good_job/capsule_tracker.rb index 0f57aee84..be9d7d250 100644 --- a/lib/good_job/capsule_tracker.rb +++ b/lib/good_job/capsule_tracker.rb @@ -57,7 +57,7 @@ def id_for_lock if @record @record.refresh_if_stale else - @record = GoodJob::Process.create_record(id: @record_id) + @record = GoodJob::Process.find_or_create_record(id: @record_id) create_refresh_task end value = @record&.id @@ -89,7 +89,7 @@ def register(with_advisory_lock: false) @advisory_locked_connection = WeakRef.new(@record.class.connection) end else - @record = GoodJob::Process.create_record(id: @record_id, with_advisory_lock: true) + @record = GoodJob::Process.find_or_create_record(id: @record_id, with_advisory_lock: true) @advisory_locked_connection = WeakRef.new(@record.class.connection) create_refresh_task end diff --git a/lib/good_job/configuration.rb b/lib/good_job/configuration.rb index 2626dc659..13c2f05c1 100644 --- a/lib/good_job/configuration.rb +++ b/lib/good_job/configuration.rb @@ -385,6 +385,16 @@ def in_webserver? end || false end + # Whether to take an advisory lock on the process record in the notifier reactor. + # @return [Boolean] + def advisory_lock_heartbeat + return options[:advisory_lock_heartbeat] unless options[:advisory_lock_heartbeat].nil? + return rails_config[:advisory_lock_heartbeat] unless rails_config[:advisory_lock_heartbeat].nil? + return ActiveModel::Type::Boolean.new.cast(env['GOOD_JOB_ADVISORY_LOCK_HEARTBEAT']) unless env['GOOD_JOB_ADVISORY_LOCK_HEARTBEAT'].nil? + + Rails.env.development? + end + private def rails_config diff --git a/lib/good_job/notifier/process_heartbeat.rb b/lib/good_job/notifier/process_heartbeat.rb index f830946ef..e7891d410 100644 --- a/lib/good_job/notifier/process_heartbeat.rb +++ b/lib/good_job/notifier/process_heartbeat.rb @@ -14,9 +14,10 @@ module ProcessHeartbeat # Registers the current process. def register_process + @advisory_lock_heartbeat = GoodJob.configuration.advisory_lock_heartbeat GoodJob::Process.override_connection(connection) do GoodJob::Process.cleanup - @capsule.tracker.register(with_advisory_lock: true) + @capsule.tracker.register(with_advisory_lock: @advisory_lock_heartbeat) end end @@ -33,7 +34,7 @@ def refresh_process # Deregisters the current process. def deregister_process GoodJob::Process.override_connection(connection) do - @capsule.tracker.unregister(with_advisory_lock: true) + @capsule.tracker.unregister(with_advisory_lock: @advisory_lock_heartbeat) end end end diff --git a/spec/app/models/good_job/process_spec.rb b/spec/app/models/good_job/process_spec.rb index 19094bdc6..53722a442 100644 --- a/spec/app/models/good_job/process_spec.rb +++ b/spec/app/models/good_job/process_spec.rb @@ -41,6 +41,23 @@ end end + describe 'find_or_create_record' do + let(:record_id) { '67160140-1bec-4c3b-bc34-1a8b36f87b21' } + + it 'creates a new record' do + record = described_class.find_or_create_record(id: record_id) + expect(record).to be_a(described_class) + end + + it 'updates an existing record' do + record = described_class.find_or_create_record(id: record_id) + record.update!(updated_at: 1.day.ago) + updated_record = described_class.find_or_create_record(id: record_id) + expect(updated_record).to eq(record) + expect(updated_record.updated_at).to be_within(1.second).of(Time.current) + end + end + describe '#basename' do let(:process) { described_class.new state: {} } diff --git a/spec/lib/good_job/configuration_spec.rb b/spec/lib/good_job/configuration_spec.rb index f0903bb94..992813dae 100644 --- a/spec/lib/good_job/configuration_spec.rb +++ b/spec/lib/good_job/configuration_spec.rb @@ -326,4 +326,35 @@ expect(configuration.dashboard_live_poll_enabled).to eq true end end + + describe '#advisory_lock_heartbeat' do + it 'defaults to true in development' do + allow(Rails).to receive(:env) { "development".inquiry } + configuration = described_class.new({}) + expect(configuration.advisory_lock_heartbeat).to be true + end + + it 'defaults to false in other environments' do + allow(Rails).to receive(:env) { "production".inquiry } + configuration = described_class.new({}) + expect(configuration.advisory_lock_heartbeat).to be false + end + + it 'can be overridden by options' do + configuration = described_class.new({ advisory_lock_heartbeat: true }) + expect(configuration.advisory_lock_heartbeat).to be true + end + + it 'can be overridden by rails config' do + allow(Rails.application.config).to receive(:good_job).and_return({ advisory_lock_heartbeat: true }) + configuration = described_class.new({}) + expect(configuration.advisory_lock_heartbeat).to be true + end + + it 'can be overridden by environment variable' do + stub_const 'ENV', ENV.to_hash.merge({ 'GOOD_JOB_ADVISORY_LOCK_HEARTBEAT' => 'true' }) + configuration = described_class.new({}) + expect(configuration.advisory_lock_heartbeat).to be true + end + end end diff --git a/spec/lib/good_job/notifier_spec.rb b/spec/lib/good_job/notifier_spec.rb index 264c443c7..26700c6b4 100644 --- a/spec/lib/good_job/notifier_spec.rb +++ b/spec/lib/good_job/notifier_spec.rb @@ -69,6 +69,7 @@ expect(notifier.connected?(timeout: 5)).to be true expect(notifier.listening?(timeout: 1)).to be false + sleep 1 notifier.shutdown @@ -85,6 +86,7 @@ expect(notifier).to be_listening(timeout: 2) described_class.notify(true) + wait_until { expect(GoodJob.capsule.tracker.id_for_lock).to be_present } wait_until(max: 5) { expect(refreshes.value).to be > 0 } notifier.shutdown @@ -195,14 +197,34 @@ it 'creates and destroys a new Process record' do notifier = described_class.new(enable_listening: true) - wait_until { expect(GoodJob::Process.count).to eq 1 } + wait_until { expect(GoodJob.capsule.tracker.locks).to eq 1 } + # Process record won't be created until the first lock is acquired when not advisory locked + id_for_lock = GoodJob.capsule.tracker.id_for_lock process = GoodJob::Process.first - expect(process.id).to eq GoodJob.capsule.tracker.id_for_lock - expect(process).to be_advisory_locked + expect(process.id).to eq id_for_lock + expect(process).not_to be_advisory_locked notifier.shutdown expect { process.reload }.to raise_error ActiveRecord::RecordNotFound end + + context 'when advisory_lock_heartbeat is true' do + before do + allow(GoodJob.configuration).to receive(:advisory_lock_heartbeat).and_return(true) + end + + it 'takes an advisory lock on the process record' do + notifier = described_class.new(enable_listening: true) + + wait_until { expect(GoodJob::Process.count).to eq 1 } + + process = GoodJob::Process.first + expect(process.id).to eq GoodJob.capsule.tracker.id_for_lock + + notifier.shutdown + expect { process.reload }.to raise_error ActiveRecord::RecordNotFound + end + end end end