Skip to content

Commit

Permalink
Merge pull request #78 from nduitz/adjust-to-new-redlock-behaviour
Browse files Browse the repository at this point in the history
Adjust to new redlock behaviour
  • Loading branch information
sharshenov authored Dec 7, 2024
2 parents 1ea6985 + 8c033b6 commit 0e41784
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 2 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ class MyJob < ActiveJob::Base
end
```

### Control redis connection errors

```ruby
class MyJob < ActiveJob::Base
# Proc gets the job instance including its arguments, and as keyword arguments the resource(lock key) `resource` and the original error `error`
unique :until_executing, on_redis_connection_error: ->(job, resource: _, error: _) { job.logger.info "Oops: #{job.arguments}" }

def perform(args)
# work
end
end
```

### Control lock key arguments

```ruby
Expand Down
5 changes: 4 additions & 1 deletion lib/active_job/uniqueness/active_job_patch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module ActiveJobPatch
def unique(strategy, options = {})
validate_on_conflict_action!(options[:on_conflict])
validate_on_conflict_action!(options[:on_runtime_conflict])
validate_on_redis_connection_error!(options[:on_redis_connection_error])

self.lock_strategy_class = ActiveJob::Uniqueness::Strategies.lookup(strategy)
self.lock_options = options
Expand All @@ -40,7 +41,9 @@ def unlock!(*arguments)

private

delegate :validate_on_conflict_action!, to: :'ActiveJob::Uniqueness.config'
delegate :validate_on_conflict_action!,
:validate_on_redis_connection_error!,
to: :'ActiveJob::Uniqueness.config'
end

included do
Expand Down
13 changes: 13 additions & 0 deletions lib/active_job/uniqueness/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Configuration
config_accessor(:lock_ttl) { 86_400 } # 1.day
config_accessor(:lock_prefix) { 'activejob_uniqueness' }
config_accessor(:on_conflict) { :raise }
config_accessor(:on_redis_connection_error) { :raise }
config_accessor(:redlock_servers) { [ENV.fetch('REDIS_URL', 'redis://localhost:6379')] }
config_accessor(:redlock_options) { { retry_count: 0 } }
config_accessor(:lock_strategies) { {} }
Expand All @@ -34,6 +35,18 @@ def validate_on_conflict_action!(action)

raise ActiveJob::Uniqueness::InvalidOnConflictAction, "Unexpected '#{action}' action on conflict"
end

def on_redis_connection_error=(action)
validate_on_redis_connection_error!(action)

config.on_redis_connection_error = action
end

def validate_on_redis_connection_error!(action)
return if action.nil? || action == :raise || action.respond_to?(:call)

raise ActiveJob::Uniqueness::InvalidOnConflictAction, "Unexpected '#{action}' action on_redis_connection_error"
end
end
end
end
17 changes: 16 additions & 1 deletion lib/active_job/uniqueness/strategies/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ class Base

delegate :lock_manager, :config, to: :'ActiveJob::Uniqueness'

attr_reader :lock_key, :lock_ttl, :on_conflict, :job
attr_reader :lock_key, :lock_ttl, :on_conflict, :on_redis_connection_error, :job

def initialize(job:)
@lock_key = job.lock_key
@lock_ttl = (job.lock_options[:lock_ttl] || config.lock_ttl).to_i * 1000 # ms
@on_conflict = job.lock_options[:on_conflict] || config.on_conflict
@on_redis_connection_error = job.lock_options[:on_redis_connection_error] || config.on_redis_connection_error
@job = job
end

Expand Down Expand Up @@ -60,6 +61,12 @@ def before_enqueue

handle_conflict(resource: lock_key, on_conflict: on_conflict)
abort_job
rescue RedisClient::ConnectionError => e
handle_redis_connection_error(
resource: lock_key, on_redis_connection_error:
on_redis_connection_error, error: e
)
abort_job
end

def around_enqueue(block)
Expand All @@ -86,6 +93,14 @@ def handle_conflict(on_conflict:, resource:, event: :conflict)
end
end

def handle_redis_connection_error(resource:, on_redis_connection_error:, error:)
case on_redis_connection_error
when :raise, nil then raise error
else
on_redis_connection_error.call(job, resource: resource, error: error)
end
end

def abort_job
@job_aborted = true # ActiveJob 4.2 workaround

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
#
# config.on_conflict = :raise

# Default action on redis connection error. Can be set per job.
# Allowed values are
# :raise - raises ActiveJob::Uniqueness::JobNotUnique
# proc - custom Proc. For example, ->(job, resource: _, error: _) { job.logger.info("Job already in queue: #{job.class.name} #{job.arguments.inspect} (#{job.job_id})") }
#
# config.on_conflict = :raise

# Digest method for lock keys generating. Expected to have `hexdigest` class method.
#
# config.digest_method = OpenSSL::Digest::MD5
Expand Down
46 changes: 46 additions & 0 deletions spec/support/shared_examples/enqueuing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,52 @@
end
end

context 'when locking fails due to RedisClient error' do
before do
job_class.unique strategy

allow_any_instance_of(ActiveJob::Uniqueness::LockManager).to receive(:lock).and_raise(RedisClient::ConnectionError)
end

shared_examples 'of no jobs enqueued' do
it 'does not enqueue the job' do
expect { suppress(RedisClient::ConnectionError) { subject } }.not_to change(enqueued_jobs, :count)
end

it 'does not remove the existing lock' do
expect { suppress(RedisClient::ConnectionError) { subject } }.not_to change(locks, :count)
end
end

context 'when no options given' do
include_examples 'of no jobs enqueued'

it 'raises a RedisClient::ConnectionError error' do
expect { subject }.to raise_error RedisClient::ConnectionError
end
end

context 'when on_redis_connection_error: :raise given' do
before { job_class.unique strategy, on_redis_connection_error: :raise }

include_examples 'of no jobs enqueued'

it 'raises a RedisClient::ConnectionError error' do
expect { subject }.to raise_error RedisClient::ConnectionError
end
end

context 'when on_redis_connection_error: Proc given' do
before { job_class.unique strategy, on_redis_connection_error: ->(job, **_kwargs) { job.logger.info('Oops') } }

include_examples 'of no jobs enqueued'

it 'calls the Proc' do
expect { subject }.to log(/Oops/)
end
end
end

context 'when the lock exists' do
before do
job_class.unique strategy
Expand Down

0 comments on commit 0e41784

Please sign in to comment.