From df3f212a8b4e37ba2b952b044be19929937310f7 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Thu, 18 Jul 2024 07:28:16 -0700 Subject: [PATCH] Convert Concurrency extension to use transaction-level (xact) advisory locks --- .../concerns/good_job/advisory_lockable.rb | 21 +++-- .../active_job_extensions/concurrency.rb | 92 ++++++++++--------- .../good_job/advisory_lockable_spec.rb | 53 ++++++----- 3 files changed, 96 insertions(+), 70 deletions(-) diff --git a/app/models/concerns/good_job/advisory_lockable.rb b/app/models/concerns/good_job/advisory_lockable.rb index 4a6ea2f6..f31ae74d 100644 --- a/app/models/concerns/good_job/advisory_lockable.rb +++ b/app/models/concerns/good_job/advisory_lockable.rb @@ -49,8 +49,8 @@ module AdvisoryLockable lock_condition = "#{function}(('x' || substr(md5(#{connection.quote(table_name)} || '-' || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)" query = cte_table.project(cte_table[:id]) - .with(composed_cte) - .where(defined?(Arel::Nodes::BoundSqlLiteral) ? Arel::Nodes::BoundSqlLiteral.new(lock_condition, [], {}) : Arel::Nodes::SqlLiteral.new(lock_condition)) + .with(composed_cte) + .where(defined?(Arel::Nodes::BoundSqlLiteral) ? Arel::Nodes::BoundSqlLiteral.new(lock_condition, [], {}) : Arel::Nodes::SqlLiteral.new(lock_condition)) limit = original_query.arel.ast.limit query.limit = limit.value if limit.present? @@ -174,8 +174,11 @@ def with_advisory_lock(column: _advisory_lockable_column, function: advisory_loc if unlock_session advisory_unlock_session else - records.each do |record| - record.advisory_unlock(key: record.lockable_column_key(column: column), function: advisory_unlockable_function(function)) + unlock_function = advisory_unlockable_function(function) + if unlock_function + records.each do |record| + record.advisory_unlock(key: record.lockable_column_key(column: column), function: unlock_function) + end end end end @@ -209,7 +212,8 @@ def advisory_lock_key(key, function: advisory_lockable_function) begin yield ensure - advisory_unlock_key(key, function: advisory_unlockable_function(function)) + unlock_function = advisory_unlockable_function(function) + advisory_unlock_key(key, function: unlock_function) if unlock_function end end @@ -220,6 +224,9 @@ def advisory_lock_key(key, function: advisory_lockable_function) # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [Boolean] whether the lock was released. def advisory_unlock_key(key, function: advisory_unlockable_function) + raise ArgumentError, "Cannot unlock transactional locks" if function.include? "_xact_" + raise ArgumentError, "No unlock function provide" if function.blank? + query = <<~SQL.squish SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked SQL @@ -284,6 +291,8 @@ def supports_cte_materialization_specifiers? # @param function [String, Symbol] name of advisory lock or unlock function # @return [Boolean] def advisory_unlockable_function(function = advisory_lockable_function) + return nil if function.include? "_xact_" # Cannot unlock transactional locks + function.to_s.sub("_lock", "_unlock").sub("_try_", "_") end @@ -403,7 +412,7 @@ def owns_advisory_lock?(key: lockable_key) # @param key [String, Symbol] Key to lock against # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [void] - def advisory_unlock!(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function)) + def advisory_unlock!(key: lockable_key, function: self.class.advisory_unlockable_function) advisory_unlock(key: key, function: function) while advisory_locked? end diff --git a/lib/good_job/active_job_extensions/concurrency.rb b/lib/good_job/active_job_extensions/concurrency.rb index 257210f1..a7c75ddf 100644 --- a/lib/good_job/active_job_extensions/concurrency.rb +++ b/lib/good_job/active_job_extensions/concurrency.rb @@ -69,34 +69,38 @@ def deserialize(job_data) throttle = enqueue_throttle next unless limit || throttle - GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_lock") do - if limit - enqueue_concurrency = if enqueue_limit - GoodJob::Job.where(concurrency_key: key).unfinished.advisory_unlocked.count - else - GoodJob::Job.where(concurrency_key: key).unfinished.count - end - - # The job has not yet been enqueued, so check if adding it will go over the limit - if (enqueue_concurrency + 1) > limit - logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its enqueue limit of #{limit} #{'job'.pluralize(limit)}" - throw :abort + result = GoodJob::Job.transaction(requires_new: true, joinable: false) do + GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_xact_lock") do + if limit + enqueue_concurrency = if enqueue_limit + GoodJob::Job.where(concurrency_key: key).unfinished.advisory_unlocked.count + else + GoodJob::Job.where(concurrency_key: key).unfinished.count + end + + # The job has not yet been enqueued, so check if adding it will go over the limit + if (enqueue_concurrency + 1) > limit + logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its enqueue limit of #{limit} #{'job'.pluralize(limit)}" + next :abort + end end - end - - if throttle - throttle_limit = throttle[0] - throttle_period = throttle[1] - enqueued_within_period = GoodJob::Job.where(concurrency_key: key) - .where(GoodJob::Job.arel_table[:created_at].gt(throttle_period.ago)) - .count - if (enqueued_within_period + 1) > throttle_limit - logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its throttle limit of #{limit} #{'job'.pluralize(limit)}" - throw :abort + if throttle + throttle_limit = throttle[0] + throttle_period = throttle[1] + enqueued_within_period = GoodJob::Job.where(concurrency_key: key) + .where(GoodJob::Job.arel_table[:created_at].gt(throttle_period.ago)) + .count + + if (enqueued_within_period + 1) > throttle_limit + logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its throttle limit of #{limit} #{'job'.pluralize(limit)}" + next :abort + end end end end + + throw :abort if result == :abort end before_perform do |job| @@ -129,29 +133,31 @@ def deserialize(job_data) next end - GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_lock") do - if limit - allowed_active_job_ids = GoodJob::Job.unfinished.where(concurrency_key: key) - .advisory_locked - .order(Arel.sql("COALESCE(performed_at, scheduled_at, created_at) ASC")) - .limit(limit).pluck(:active_job_id) - # The current job has already been locked and will appear in the previous query - raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError unless allowed_active_job_ids.include?(job.job_id) - end + GoodJob::Job.transaction(requires_new: true, joinable: false) do + GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_xact_lock") do + if limit + allowed_active_job_ids = GoodJob::Job.unfinished.where(concurrency_key: key) + .advisory_locked + .order(Arel.sql("COALESCE(performed_at, scheduled_at, created_at) ASC")) + .limit(limit).pluck(:active_job_id) + # The current job has already been locked and will appear in the previous query + raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError unless allowed_active_job_ids.include?(job.job_id) + end - if throttle - throttle_limit = throttle[0] - throttle_period = throttle[1] + if throttle + throttle_limit = throttle[0] + throttle_period = throttle[1] - query = Execution.joins(:job) - .where(GoodJob::Job.table_name => { concurrency_key: key }) - .where(Execution.arel_table[:created_at].gt(Execution.bind_value('created_at', throttle_period.ago, ActiveRecord::Type::DateTime))) - allowed_active_job_ids = query.where(error: nil).or(query.where.not(error: "GoodJob::ActiveJobExtensions::Concurrency::ThrottleExceededError: GoodJob::ActiveJobExtensions::Concurrency::ThrottleExceededError")) - .order(created_at: :asc) - .limit(throttle_limit) - .pluck(:active_job_id) + query = Execution.joins(:job) + .where(GoodJob::Job.table_name => { concurrency_key: key }) + .where(Execution.arel_table[:created_at].gt(Execution.bind_value('created_at', throttle_period.ago, ActiveRecord::Type::DateTime))) + allowed_active_job_ids = query.where(error: nil).or(query.where.not(error: "GoodJob::ActiveJobExtensions::Concurrency::ThrottleExceededError: GoodJob::ActiveJobExtensions::Concurrency::ThrottleExceededError")) + .order(created_at: :asc) + .limit(throttle_limit) + .pluck(:active_job_id) - raise ThrottleExceededError unless allowed_active_job_ids.include?(job.job_id) + raise ThrottleExceededError unless allowed_active_job_ids.include?(job.job_id) + end end end end diff --git a/spec/app/models/concerns/good_job/advisory_lockable_spec.rb b/spec/app/models/concerns/good_job/advisory_lockable_spec.rb index 57333601..15d019e8 100644 --- a/spec/app/models/concerns/good_job/advisory_lockable_spec.rb +++ b/spec/app/models/concerns/good_job/advisory_lockable_spec.rb @@ -451,33 +451,44 @@ promise.value! end - it 'transaction-level locks only lock within transactions' do - locked_event = Concurrent::Event.new - commit_event = Concurrent::Event.new - committed_event = Concurrent::Event.new - done_event = Concurrent::Event.new - - promise = rails_promise do - job.class.transaction do - job.advisory_lock(function: "pg_advisory_xact_lock") - locked_event.set + describe "transaction-level-locks" do + it 'only lock within transactions' do + locked_event = Concurrent::Event.new + commit_event = Concurrent::Event.new + committed_event = Concurrent::Event.new + done_event = Concurrent::Event.new + + promise = rails_promise do + job.class.transaction do + job.advisory_lock(function: "pg_advisory_xact_lock") + locked_event.set + + commit_event.wait(10) + end + committed_event.set - commit_event.wait(10) + done_event.wait(10) end - committed_event.set - done_event.wait(10) - end + locked_event.wait(10) + expect(job.advisory_locked?).to be true + commit_event.set - locked_event.wait(10) - expect(job.advisory_locked?).to be true - commit_event.set + committed_event.wait(10) + expect(job.advisory_locked?).to be false - committed_event.wait(10) - expect(job.advisory_locked?).to be false + done_event.set + promise.value! + end - done_event.set - promise.value! + it "locks and unlocks" do + GoodJob::Job.transaction do + job.advisory_lock(function: "pg_advisory_xact_lock") do + expect(job.advisory_locked?).to be true + end + end + expect(job.advisory_locked?).to be false + end end end end