Skip to content

Commit

Permalink
Convert Concurrency extension to use transaction-level (xact) advisor…
Browse files Browse the repository at this point in the history
…y locks
  • Loading branch information
bensheldon committed Jul 18, 2024
1 parent 7644b2a commit df3f212
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 70 deletions.
21 changes: 15 additions & 6 deletions app/models/concerns/good_job/advisory_lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ module AdvisoryLockable

lock_condition = "#{function}(('x' || substr(md5(#{connection.quote(table_name)} || '-' || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)"
query = cte_table.project(cte_table[:id])
.with(composed_cte)
.where(defined?(Arel::Nodes::BoundSqlLiteral) ? Arel::Nodes::BoundSqlLiteral.new(lock_condition, [], {}) : Arel::Nodes::SqlLiteral.new(lock_condition))
.with(composed_cte)
.where(defined?(Arel::Nodes::BoundSqlLiteral) ? Arel::Nodes::BoundSqlLiteral.new(lock_condition, [], {}) : Arel::Nodes::SqlLiteral.new(lock_condition))

limit = original_query.arel.ast.limit
query.limit = limit.value if limit.present?
Expand Down Expand Up @@ -174,8 +174,11 @@ def with_advisory_lock(column: _advisory_lockable_column, function: advisory_loc
if unlock_session
advisory_unlock_session
else
records.each do |record|
record.advisory_unlock(key: record.lockable_column_key(column: column), function: advisory_unlockable_function(function))
unlock_function = advisory_unlockable_function(function)
if unlock_function
records.each do |record|
record.advisory_unlock(key: record.lockable_column_key(column: column), function: unlock_function)
end
end
end
end
Expand Down Expand Up @@ -209,7 +212,8 @@ def advisory_lock_key(key, function: advisory_lockable_function)
begin
yield
ensure
advisory_unlock_key(key, function: advisory_unlockable_function(function))
unlock_function = advisory_unlockable_function(function)
advisory_unlock_key(key, function: unlock_function) if unlock_function
end
end

Expand All @@ -220,6 +224,9 @@ def advisory_lock_key(key, function: advisory_lockable_function)
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @return [Boolean] whether the lock was released.
def advisory_unlock_key(key, function: advisory_unlockable_function)
raise ArgumentError, "Cannot unlock transactional locks" if function.include? "_xact_"
raise ArgumentError, "No unlock function provide" if function.blank?

query = <<~SQL.squish
SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked
SQL
Expand Down Expand Up @@ -284,6 +291,8 @@ def supports_cte_materialization_specifiers?
# @param function [String, Symbol] name of advisory lock or unlock function
# @return [Boolean]
def advisory_unlockable_function(function = advisory_lockable_function)
return nil if function.include? "_xact_" # Cannot unlock transactional locks

function.to_s.sub("_lock", "_unlock").sub("_try_", "_")
end

Expand Down Expand Up @@ -403,7 +412,7 @@ def owns_advisory_lock?(key: lockable_key)
# @param key [String, Symbol] Key to lock against
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @return [void]
def advisory_unlock!(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function))
def advisory_unlock!(key: lockable_key, function: self.class.advisory_unlockable_function)
advisory_unlock(key: key, function: function) while advisory_locked?
end

Expand Down
92 changes: 49 additions & 43 deletions lib/good_job/active_job_extensions/concurrency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,38 @@ def deserialize(job_data)
throttle = enqueue_throttle
next unless limit || throttle

GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_lock") do
if limit
enqueue_concurrency = if enqueue_limit
GoodJob::Job.where(concurrency_key: key).unfinished.advisory_unlocked.count
else
GoodJob::Job.where(concurrency_key: key).unfinished.count
end

# The job has not yet been enqueued, so check if adding it will go over the limit
if (enqueue_concurrency + 1) > limit
logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its enqueue limit of #{limit} #{'job'.pluralize(limit)}"
throw :abort
result = GoodJob::Job.transaction(requires_new: true, joinable: false) do
GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_xact_lock") do
if limit
enqueue_concurrency = if enqueue_limit
GoodJob::Job.where(concurrency_key: key).unfinished.advisory_unlocked.count
else
GoodJob::Job.where(concurrency_key: key).unfinished.count
end

# The job has not yet been enqueued, so check if adding it will go over the limit
if (enqueue_concurrency + 1) > limit
logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its enqueue limit of #{limit} #{'job'.pluralize(limit)}"
next :abort
end
end
end

if throttle
throttle_limit = throttle[0]
throttle_period = throttle[1]
enqueued_within_period = GoodJob::Job.where(concurrency_key: key)
.where(GoodJob::Job.arel_table[:created_at].gt(throttle_period.ago))
.count

if (enqueued_within_period + 1) > throttle_limit
logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its throttle limit of #{limit} #{'job'.pluralize(limit)}"
throw :abort
if throttle
throttle_limit = throttle[0]
throttle_period = throttle[1]
enqueued_within_period = GoodJob::Job.where(concurrency_key: key)
.where(GoodJob::Job.arel_table[:created_at].gt(throttle_period.ago))
.count

if (enqueued_within_period + 1) > throttle_limit
logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its throttle limit of #{limit} #{'job'.pluralize(limit)}"
next :abort
end
end
end
end

throw :abort if result == :abort
end

before_perform do |job|
Expand Down Expand Up @@ -129,29 +133,31 @@ def deserialize(job_data)
next
end

GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_lock") do
if limit
allowed_active_job_ids = GoodJob::Job.unfinished.where(concurrency_key: key)
.advisory_locked
.order(Arel.sql("COALESCE(performed_at, scheduled_at, created_at) ASC"))
.limit(limit).pluck(:active_job_id)
# The current job has already been locked and will appear in the previous query
raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError unless allowed_active_job_ids.include?(job.job_id)
end
GoodJob::Job.transaction(requires_new: true, joinable: false) do
GoodJob::Job.advisory_lock_key(key, function: "pg_advisory_xact_lock") do
if limit
allowed_active_job_ids = GoodJob::Job.unfinished.where(concurrency_key: key)
.advisory_locked
.order(Arel.sql("COALESCE(performed_at, scheduled_at, created_at) ASC"))
.limit(limit).pluck(:active_job_id)
# The current job has already been locked and will appear in the previous query
raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError unless allowed_active_job_ids.include?(job.job_id)
end

if throttle
throttle_limit = throttle[0]
throttle_period = throttle[1]
if throttle
throttle_limit = throttle[0]
throttle_period = throttle[1]

query = Execution.joins(:job)
.where(GoodJob::Job.table_name => { concurrency_key: key })
.where(Execution.arel_table[:created_at].gt(Execution.bind_value('created_at', throttle_period.ago, ActiveRecord::Type::DateTime)))
allowed_active_job_ids = query.where(error: nil).or(query.where.not(error: "GoodJob::ActiveJobExtensions::Concurrency::ThrottleExceededError: GoodJob::ActiveJobExtensions::Concurrency::ThrottleExceededError"))
.order(created_at: :asc)
.limit(throttle_limit)
.pluck(:active_job_id)
query = Execution.joins(:job)
.where(GoodJob::Job.table_name => { concurrency_key: key })
.where(Execution.arel_table[:created_at].gt(Execution.bind_value('created_at', throttle_period.ago, ActiveRecord::Type::DateTime)))
allowed_active_job_ids = query.where(error: nil).or(query.where.not(error: "GoodJob::ActiveJobExtensions::Concurrency::ThrottleExceededError: GoodJob::ActiveJobExtensions::Concurrency::ThrottleExceededError"))
.order(created_at: :asc)
.limit(throttle_limit)
.pluck(:active_job_id)

raise ThrottleExceededError unless allowed_active_job_ids.include?(job.job_id)
raise ThrottleExceededError unless allowed_active_job_ids.include?(job.job_id)
end
end
end
end
Expand Down
53 changes: 32 additions & 21 deletions spec/app/models/concerns/good_job/advisory_lockable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -451,33 +451,44 @@
promise.value!
end

it 'transaction-level locks only lock within transactions' do
locked_event = Concurrent::Event.new
commit_event = Concurrent::Event.new
committed_event = Concurrent::Event.new
done_event = Concurrent::Event.new

promise = rails_promise do
job.class.transaction do
job.advisory_lock(function: "pg_advisory_xact_lock")
locked_event.set
describe "transaction-level-locks" do
it 'only lock within transactions' do
locked_event = Concurrent::Event.new
commit_event = Concurrent::Event.new
committed_event = Concurrent::Event.new
done_event = Concurrent::Event.new

promise = rails_promise do
job.class.transaction do
job.advisory_lock(function: "pg_advisory_xact_lock")
locked_event.set

commit_event.wait(10)
end
committed_event.set

commit_event.wait(10)
done_event.wait(10)
end
committed_event.set

done_event.wait(10)
end
locked_event.wait(10)
expect(job.advisory_locked?).to be true
commit_event.set

locked_event.wait(10)
expect(job.advisory_locked?).to be true
commit_event.set
committed_event.wait(10)
expect(job.advisory_locked?).to be false

committed_event.wait(10)
expect(job.advisory_locked?).to be false
done_event.set
promise.value!
end

done_event.set
promise.value!
it "locks and unlocks" do
GoodJob::Job.transaction do
job.advisory_lock(function: "pg_advisory_xact_lock") do
expect(job.advisory_locked?).to be true
end
end
expect(job.advisory_locked?).to be false
end
end
end
end

0 comments on commit df3f212

Please sign in to comment.