Skip to content

Commit

Permalink
Merge pull request #54 from mrexox/feature/support-sidekiq-upto-7
Browse files Browse the repository at this point in the history
Support sidekiq upto 7
  • Loading branch information
gzigzigzeo authored Dec 2, 2022
2 parents 19cc8a8 + 7dfa158 commit 0c68a93
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 53 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ spec/reports
test/tmp
test/version_tmp
tmp
.lefthook-local.yml
8 changes: 8 additions & 0 deletions Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,11 @@ end
appraise 'sidekiq-6.5' do
gem 'sidekiq', '~> 6.5.0'
end

appraise 'sidekiq-7.0' do
gem 'sidekiq', '~> 7.0.0'
end

appraise 'sidekiq-master' do
gem 'sidekiq', github: 'mperham/sidekiq'
end
8 changes: 8 additions & 0 deletions bin/console
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require "bundler/setup"
require "sidekiq/grouping"

require "pry"
Pry.start
7 changes: 7 additions & 0 deletions gemfiles/sidekiq_7.0.gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# This file was generated by Appraisal

source "https://rubygems.org"

gem "sidekiq", "~> 7.0.0"

gemspec path: "../"
9 changes: 7 additions & 2 deletions lefthook.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
# See: github.com/evilmartians/lefthook

pre-commit:
parallel: true
commands:
appraisal:
glob: "{Appraisals,*.gemfile}"
run: echo {staged_files} > /dev/null; bundle exec appraisal install && git add gemfiles/*.gemfile
rubocop:
glob: "*.rb"
glob: "{*.rb,*.gemspec,Gemfile,Rakefile}"
run: bundle exec rubocop -A {staged_files} && git add {staged_files}

pre-push:
commands:
rspec:
run: bundle exec appraisal rspec
glob: "*.rb"
run: echo {push_files} > /dev/null; bundle exec appraisal rspec
1 change: 1 addition & 0 deletions lib/sidekiq/grouping.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "active_support/core_ext/string"
require "active_support/configurable"
require "active_support/core_ext/numeric/time"
require "sidekiq"
require "sidekiq/grouping/version"
require "concurrent"

Expand Down
8 changes: 5 additions & 3 deletions lib/sidekiq/grouping/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ module Config
include ActiveSupport::Configurable

def self.options
if Sidekiq.respond_to?(:[])
Sidekiq[:grouping] || Sidekiq["grouping"] || {}
else
if Sidekiq.respond_to?(:[]) # Sidekiq 6.x
Sidekiq[:grouping] || {}
elsif Sidekiq.respond_to?(:options) # Sidekiq <= 5.x
Sidekiq.options[:grouping] || Sidekiq.options["grouping"] || {}
else # Sidekiq 7.x
Sidekiq.default_configuration[:grouping] || {}
end
end

Expand Down
72 changes: 43 additions & 29 deletions lib/sidekiq/grouping/redis.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# frozen_string_literal: true

require_relative "./redis_dispatcher"

module Sidekiq
module Grouping
class Redis
include RedisDispatcher

PLUCK_SCRIPT = <<-SCRIPT
local pluck_values = redis.call('lpop', KEYS[1], ARGV[1]) or {}
if #pluck_values > 0 then
Expand All @@ -15,61 +19,75 @@ def push_msg(name, msg, remember_unique: false)
redis do |conn|
conn.multi do |pipeline|
sadd = pipeline.respond_to?(:sadd?) ? :sadd? : :sadd
pipeline.public_send(sadd, ns("batches"), name)
pipeline.rpush(ns(name), msg)
redis_connection_call(pipeline, sadd, ns("batches"), name)
redis_connection_call(pipeline, :rpush, ns(name), msg)

if remember_unique
pipeline.public_send(
sadd,
unique_messages_key(name),
msg
redis_connection_call(
pipeline, sadd, unique_messages_key(name), msg
)
end
end
end
end

def enqueued?(name, msg)
redis do |conn|
conn.sismember(unique_messages_key(name), msg)
end
member = redis_call(:sismember, unique_messages_key(name), msg)
return member if member.is_a?(TrueClass) || member.is_a?(FalseClass)

member != 0
end

def batch_size(name)
redis { |conn| conn.llen(ns(name)) }
redis_call(:llen, ns(name))
end

def batches
redis { |conn| conn.smembers(ns("batches")) }
redis_call(:smembers, ns("batches"))
end

def pluck(name, limit)
keys = [ns(name), unique_messages_key(name)]
args = [limit]
redis { |conn| conn.eval PLUCK_SCRIPT, keys, args }
if new_redis_client?
redis_call(
:eval,
PLUCK_SCRIPT,
2,
ns(name),
unique_messages_key(name),
limit
)
else
keys = [ns(name), unique_messages_key(name)]
args = [limit]
redis_call(:eval, PLUCK_SCRIPT, keys, args)
end
end

def get_last_execution_time(name)
redis { |conn| conn.get(ns("last_execution_time:#{name}")) }
redis_call(:get, ns("last_execution_time:#{name}"))
end

def set_last_execution_time(name, time)
redis do |conn|
conn.set(ns("last_execution_time:#{name}"), time.to_json)
end
redis_call(
:set, ns("last_execution_time:#{name}"), time.to_json
)
end

def lock(name)
redis do |conn|
id = ns("lock:#{name}")
conn.set(id, true, nx: true, ex: Sidekiq::Grouping::Config.lock_ttl)
end
redis_call(
:set,
ns("lock:#{name}"),
"true",
nx: true,
ex: Sidekiq::Grouping::Config.lock_ttl
)
end

def delete(name)
redis do |conn|
conn.del(ns("last_execution_time:#{name}"))
conn.del(ns(name))
conn.srem(ns("batches"), name)
redis_connection_call(conn, :del, ns("last_execution_time:#{name}"))
redis_connection_call(conn, :del, ns(name))
redis_connection_call(conn, :srem, ns("batches"), name)
end
end

Expand All @@ -82,10 +100,6 @@ def unique_messages_key(name)
def ns(key = nil)
"batching:#{key}"
end

def redis(&block)
Sidekiq.redis(&block)
end
end
end
end
29 changes: 29 additions & 0 deletions lib/sidekiq/grouping/redis_dispatcher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

module Sidekiq
module Grouping
module RedisDispatcher
def redis_call(command, *args, **kwargs)
redis do |connection|
redis_connection_call(connection, command, *args, **kwargs)
end
end

def redis_connection_call(connection, command, *args, **kwargs)
if new_redis_client? # redis-client
connection.call(command.to_s.upcase, *args, **kwargs)
else # redis
connection.public_send(command, *args, **kwargs)
end
end

def new_redis_client?
Sidekiq::VERSION[0].to_i >= 7
end

def redis(&block)
Sidekiq.redis(&block)
end
end
end
end
2 changes: 1 addition & 1 deletion lib/sidekiq/grouping/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sidekiq
module Grouping
VERSION = "1.2.0"
VERSION = "1.3.0"
end
end
3 changes: 2 additions & 1 deletion sidekiq-grouping.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Gem::Specification.new do |spec|

spec.add_development_dependency "appraisal"
spec.add_development_dependency "bundler", "> 1.5"
spec.add_development_dependency "pry"
spec.add_development_dependency "rake"
spec.add_development_dependency "rspec"
spec.add_development_dependency "rspec-sidekiq"
Expand All @@ -33,6 +34,6 @@ Gem::Specification.new do |spec|

spec.add_dependency "activesupport"
spec.add_dependency "concurrent-ruby"
spec.add_dependency "sidekiq", ">= 3.4.2", "< 7"
spec.add_dependency "sidekiq", ">= 3.4.2"
spec.metadata["rubygems_mfa_required"] = "true"
end
22 changes: 9 additions & 13 deletions spec/modules/redis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
require "spec_helper"

describe Sidekiq::Grouping::Redis do
include Sidekiq::Grouping::RedisDispatcher

subject(:redis_service) { described_class.new }

let(:queue_name) { "my_queue" }
Expand All @@ -12,14 +14,14 @@
describe "#push_msg" do
it "adds message to queue", :aggregate_failures do
redis_service.push_msg(queue_name, "My message")
expect(redis { |c| c.llen key }).to eq 1
expect(redis { |c| c.lrange key, 0, 1 }).to eq ["My message"]
expect(redis { |c| c.smembers unique_key }).to eq []
expect(redis_call(:llen, key)).to eq 1
expect(redis_call(:lrange, key, 0, 1)).to eq ["My message"]
expect(redis_call(:smembers, unique_key)).to eq []
end

it "remembers unique message if specified" do
redis_service.push_msg(queue_name, "My message", remember_unique: true)
expect(redis { |c| c.smembers unique_key }).to eq ["My message"]
expect(redis_call(:smembers, unique_key)).to eq ["My message"]
end
end

Expand All @@ -28,21 +30,15 @@
redis_service.push_msg(queue_name, "Message 1")
redis_service.push_msg(queue_name, "Message 2")
redis_service.pluck(queue_name, 2)
expect(redis { |c| c.llen key }).to eq 0
expect(redis_call(:llen, key)).to eq 0
end

it "forgets unique messages", :aggregate_failures do
redis_service.push_msg(queue_name, "Message 1", remember_unique: true)
redis_service.push_msg(queue_name, "Message 2", remember_unique: true)
expect(redis { |c| c.scard unique_key }).to eq 2
expect(redis_call(:scard, unique_key)).to eq 2
redis_service.pluck(queue_name, 2)
expect(redis { |c| c.smembers unique_key }).to eq []
expect(redis_call(:smembers, unique_key)).to eq []
end
end

private

def redis(&block)
Sidekiq.redis(&block)
end
end
16 changes: 12 additions & 4 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require "sidekiq"
require "rspec-sidekiq"
require "support/test_workers"
require "pry"

SimpleCov.start do
add_filter "spec"
Expand All @@ -17,8 +18,10 @@
require "sidekiq/grouping"

Sidekiq::Grouping.logger = nil
Sidekiq.redis = { db: ENV.fetch("db", 1) }
Sidekiq.logger = nil
Sidekiq.configure_client do |config|
config.redis = { db: 1 }
config.logger = nil
end

RSpec::Sidekiq.configure do |config|
config.clear_all_enqueued_jobs = true
Expand All @@ -32,8 +35,13 @@

config.before do
Sidekiq.redis do |conn|
keys = conn.keys "*batching*"
keys.each { |key| conn.del key }
if Sidekiq::VERSION[0].to_i >= 7
keys = conn.call("KEYS", "*batching*")
keys.each { |key| conn.call("DEL", key) }
else
keys = conn.keys "*batching*"
keys.each { |key| conn.del key }
end
end
end

Expand Down

0 comments on commit 0c68a93

Please sign in to comment.