diff --git a/README.md b/README.md index 32ef415e..857525f1 100644 --- a/README.md +++ b/README.md @@ -308,6 +308,7 @@ Available configuration options are: - `logger` ([Rails Logger](https://api.rubyonrails.org/classes/ActiveSupport/Logger.html)) lets you set a custom logger for GoodJob. It should be an instance of a Rails `Logger` (Default: `Rails.logger`). - `preserve_job_records` (boolean) keeps job records in your database even after jobs are completed. (Default: `true`) - `smaller_number_is_higher_priority` (boolean) allows you to specifiy that jobs should be run in ascending order of priority (smallest priority numbers first). This will be enabled by default in the next major version of GoodJob (v4.0), but jobs with the highest priority number are run first by default in all earlier versions of GoodJob. +- `advisory_lock_heartbeat` (boolean) whether to use an advisory lock for the purpose of determining whether an execeution process is active. (Default `true` in Development; `false` in other environments) - `retry_on_unhandled_error` (boolean) causes jobs to be re-queued and retried if they raise an instance of `StandardError`. Be advised this may lead to jobs being repeated infinitely ([see below for more on retries](#retries)). Instances of `Exception`, like SIGINT, will *always* be retried, regardless of this attribute’s value. (Default: `false`) - `on_thread_error` (proc, lambda, or callable) will be called when there is an Exception. It can be useful for logging errors to bug tracking services, like Sentry or Airbrake. Example: diff --git a/app/models/good_job/process.rb b/app/models/good_job/process.rb index f52ea0b3..a7e8e673 100644 --- a/app/models/good_job/process.rb +++ b/app/models/good_job/process.rb @@ -56,16 +56,26 @@ def self.cleanup end end - def self.create_record(id:, with_advisory_lock: false) - attributes = { - id: id, - state: process_state, - } - if with_advisory_lock - attributes[:create_with_advisory_lock] = true - attributes[:lock_type] = :advisory + def self.find_or_create_record(id:, with_advisory_lock: false) + existing_record = find_by(id: id) + if existing_record + if with_advisory_lock + existing_record.advisory_lock! + existing_record.update(lock_type: :advisory, state: process_state) + else + existing_record.update(lock_type: nil, state: process_state) + end + else + attributes = { + id: id, + state: process_state, + } + if with_advisory_lock + attributes[:create_with_advisory_lock] = true + attributes[:lock_type] = :advisory + end + create!(attributes) end - create!(attributes) end def self.process_state diff --git a/lib/good_job/capsule_tracker.rb b/lib/good_job/capsule_tracker.rb index 0f57aee8..be9d7d25 100644 --- a/lib/good_job/capsule_tracker.rb +++ b/lib/good_job/capsule_tracker.rb @@ -57,7 +57,7 @@ def id_for_lock if @record @record.refresh_if_stale else - @record = GoodJob::Process.create_record(id: @record_id) + @record = GoodJob::Process.find_or_create_record(id: @record_id) create_refresh_task end value = @record&.id @@ -89,7 +89,7 @@ def register(with_advisory_lock: false) @advisory_locked_connection = WeakRef.new(@record.class.connection) end else - @record = GoodJob::Process.create_record(id: @record_id, with_advisory_lock: true) + @record = GoodJob::Process.find_or_create_record(id: @record_id, with_advisory_lock: true) @advisory_locked_connection = WeakRef.new(@record.class.connection) create_refresh_task end diff --git a/lib/good_job/configuration.rb b/lib/good_job/configuration.rb index 2626dc65..13c2f05c 100644 --- a/lib/good_job/configuration.rb +++ b/lib/good_job/configuration.rb @@ -385,6 +385,16 @@ def in_webserver? end || false end + # Whether to take an advisory lock on the process record in the notifier reactor. + # @return [Boolean] + def advisory_lock_heartbeat + return options[:advisory_lock_heartbeat] unless options[:advisory_lock_heartbeat].nil? + return rails_config[:advisory_lock_heartbeat] unless rails_config[:advisory_lock_heartbeat].nil? + return ActiveModel::Type::Boolean.new.cast(env['GOOD_JOB_ADVISORY_LOCK_HEARTBEAT']) unless env['GOOD_JOB_ADVISORY_LOCK_HEARTBEAT'].nil? + + Rails.env.development? + end + private def rails_config diff --git a/lib/good_job/notifier/process_heartbeat.rb b/lib/good_job/notifier/process_heartbeat.rb index f830946e..e7891d41 100644 --- a/lib/good_job/notifier/process_heartbeat.rb +++ b/lib/good_job/notifier/process_heartbeat.rb @@ -14,9 +14,10 @@ module ProcessHeartbeat # Registers the current process. def register_process + @advisory_lock_heartbeat = GoodJob.configuration.advisory_lock_heartbeat GoodJob::Process.override_connection(connection) do GoodJob::Process.cleanup - @capsule.tracker.register(with_advisory_lock: true) + @capsule.tracker.register(with_advisory_lock: @advisory_lock_heartbeat) end end @@ -33,7 +34,7 @@ def refresh_process # Deregisters the current process. def deregister_process GoodJob::Process.override_connection(connection) do - @capsule.tracker.unregister(with_advisory_lock: true) + @capsule.tracker.unregister(with_advisory_lock: @advisory_lock_heartbeat) end end end diff --git a/spec/lib/good_job/configuration_spec.rb b/spec/lib/good_job/configuration_spec.rb index f0903bb9..992813da 100644 --- a/spec/lib/good_job/configuration_spec.rb +++ b/spec/lib/good_job/configuration_spec.rb @@ -326,4 +326,35 @@ expect(configuration.dashboard_live_poll_enabled).to eq true end end + + describe '#advisory_lock_heartbeat' do + it 'defaults to true in development' do + allow(Rails).to receive(:env) { "development".inquiry } + configuration = described_class.new({}) + expect(configuration.advisory_lock_heartbeat).to be true + end + + it 'defaults to false in other environments' do + allow(Rails).to receive(:env) { "production".inquiry } + configuration = described_class.new({}) + expect(configuration.advisory_lock_heartbeat).to be false + end + + it 'can be overridden by options' do + configuration = described_class.new({ advisory_lock_heartbeat: true }) + expect(configuration.advisory_lock_heartbeat).to be true + end + + it 'can be overridden by rails config' do + allow(Rails.application.config).to receive(:good_job).and_return({ advisory_lock_heartbeat: true }) + configuration = described_class.new({}) + expect(configuration.advisory_lock_heartbeat).to be true + end + + it 'can be overridden by environment variable' do + stub_const 'ENV', ENV.to_hash.merge({ 'GOOD_JOB_ADVISORY_LOCK_HEARTBEAT' => 'true' }) + configuration = described_class.new({}) + expect(configuration.advisory_lock_heartbeat).to be true + end + end end diff --git a/spec/lib/good_job/notifier_spec.rb b/spec/lib/good_job/notifier_spec.rb index 264c443c..26700c6b 100644 --- a/spec/lib/good_job/notifier_spec.rb +++ b/spec/lib/good_job/notifier_spec.rb @@ -69,6 +69,7 @@ expect(notifier.connected?(timeout: 5)).to be true expect(notifier.listening?(timeout: 1)).to be false + sleep 1 notifier.shutdown @@ -85,6 +86,7 @@ expect(notifier).to be_listening(timeout: 2) described_class.notify(true) + wait_until { expect(GoodJob.capsule.tracker.id_for_lock).to be_present } wait_until(max: 5) { expect(refreshes.value).to be > 0 } notifier.shutdown @@ -195,14 +197,34 @@ it 'creates and destroys a new Process record' do notifier = described_class.new(enable_listening: true) - wait_until { expect(GoodJob::Process.count).to eq 1 } + wait_until { expect(GoodJob.capsule.tracker.locks).to eq 1 } + # Process record won't be created until the first lock is acquired when not advisory locked + id_for_lock = GoodJob.capsule.tracker.id_for_lock process = GoodJob::Process.first - expect(process.id).to eq GoodJob.capsule.tracker.id_for_lock - expect(process).to be_advisory_locked + expect(process.id).to eq id_for_lock + expect(process).not_to be_advisory_locked notifier.shutdown expect { process.reload }.to raise_error ActiveRecord::RecordNotFound end + + context 'when advisory_lock_heartbeat is true' do + before do + allow(GoodJob.configuration).to receive(:advisory_lock_heartbeat).and_return(true) + end + + it 'takes an advisory lock on the process record' do + notifier = described_class.new(enable_listening: true) + + wait_until { expect(GoodJob::Process.count).to eq 1 } + + process = GoodJob::Process.first + expect(process.id).to eq GoodJob.capsule.tracker.id_for_lock + + notifier.shutdown + expect { process.reload }.to raise_error ActiveRecord::RecordNotFound + end + end end end