Skip to content

Commit

Permalink
support finding litqueue jobs by different criteria, fixes #93
Browse files Browse the repository at this point in the history
  • Loading branch information
oldmoe committed Apr 23, 2024
1 parent 16bc059 commit d191fb3
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 2 deletions.
10 changes: 9 additions & 1 deletion lib/litestack/litejobqueue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,15 @@ def stop
# @@queue = nil
close
end

private

def prepare_search_options(opts)
sql_opts = super(opts)
sql_opts[:klass] = opts[:klass]
sql_opts[:params] = opts[:params]
sql_opts
end

def exit_callback
@running = false # stop all workers
Expand Down Expand Up @@ -180,6 +187,7 @@ def schedule(spawn = false, &block)

# create a worker according to environment
def create_worker
# temporarily stop this feature until a better solution is implemented
#return if defined?(Rails) && !defined?(Rails::Server)
Litescheduler.spawn do
worker_sleep_index = 0
Expand Down
15 changes: 15 additions & 0 deletions lib/litestack/litequeue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,24 @@ def snapshot
queues: queues
}
end

def find(opts = {})
run_stmt(:search, prepare_search_options(opts))
end

private

def prepare_search_options(opts)
sql_opts = {}
sql_opts[:fire_at_from] = opts[:fire_at][0] rescue nil
sql_opts[:fire_at_to] = opts[:fire_at][1] rescue nil
sql_opts[:created_at_from] = opts[:created_at][0] rescue nil
sql_opts[:created_at_to] = opts[:created_at][1] rescue nil
sql_opts[:name] = opts[:queue]
sql_opts[:dir] = opts[:dir] == :desc ? -1 : 1
sql_opts
end

def create_connection
super("#{__dir__}/litequeue.sql.yml") do |conn|
conn.wal_autocheckpoint = 10000
Expand Down
20 changes: 19 additions & 1 deletion lib/litestack/litequeue.sql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,25 @@ stmts:
DELETE FROM queue
WHERE id = $1
RETURNING value;
search: >
SELECT *, :params, value ->> '$.params', value ->> '$.params' LIKE '%'||'three_second'||'%' FROM queue
WHERE
name = ifnull(:name, name)
AND
iif(:fire_at_from, fire_at >= :fire_at_from, true)
AND
iif(:fire_at_to, fire_at <= :fire_at_to, true)
AND
iif(:created_at_from, created_at >= :created_at_from, true)
AND
iif(:created_at_to, created_at <= :created_at_to, true)
AND
iif(:klass IS NOT NULL, value ->> '$.klass' LIKE '%'||:klass||'%', true)
AND
iif(:params IS NOT NULL, value ->> '$.params' LIKE '%'||:params||'%', true)
ORDER BY created_at * :dir;
info: >
SELECT
name,
Expand Down
19 changes: 19 additions & 0 deletions test/test_litejob_rails.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,25 @@ def test_job_is_performed_after_one_second
assert Job.sink[:one_second]
end

def test_find_job_by_class_name_and_params
Job.set(wait: 2.seconds).perform_later(:two_seconds, Time.now)
Job.set(wait: 3.seconds).perform_later(:three_seconds, Time.now)
res = $ljq.find(created_at: [nil, Time.now.to_i + 1])
assert_equal 2, res.length
res = $ljq.find(klass: "Job", params: "seconds")
assert_equal 2, res.length
res = $ljq.find(klass: "NonExistentJob")
assert_equal 0, res.length
res = $ljq.find(params: "SeCoNd")
assert_equal 2, res.length
res = $ljq.find(params: "notfoundparam")
assert_equal 0, res.length
res = $ljq.find(params: "three")
assert_equal 1, res.length
end

private

def wait_for(condition, time)
slept = 0
step = 0.01
Expand Down
20 changes: 20 additions & 0 deletions test/test_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,24 @@ def test_delay
sleep 1
assert !@queue.pop.nil?
end

def test_find
j1 = @queue.push(1, 0, "q1")
j2 = @queue.push(2, 10, "q2")
res = @queue.find({fire_at: [Time.now.to_i+1, nil] })
assert_equal 1, res.length
assert_equal j2[0], res[0][0]
res = @queue.find()
assert_equal 2, res.length
res = @queue.find(created_at: [Time.now.to_i, nil])
assert_equal 2, res.length
res = @queue.find({fire_at: [nil, Time.now.to_i + 1] })
assert_equal 1, res.length
assert_equal j1[0], res[0][0]
res = @queue.find({fire_at: [Time.now.to_i + 1, Time.now.to_i + 11] })
assert_equal 1, res.length
assert_equal j2[0], res[0][0]
res = @queue.find({fire_at: [Time.now.to_i + 1, Time.now.to_i + 2] })
assert_equal 0, res.length
end
end

0 comments on commit d191fb3

Please sign in to comment.