Skip to content

Commit

Permalink
Merge pull request #199 from rails/dispatch-scheduled-and-recurring
Browse files Browse the repository at this point in the history
Allow enqueuing scheduled jobs and recurring tasks right away
  • Loading branch information
rosa authored Nov 8, 2024
2 parents 0b91e3b + b52b420 commit d36e5df
Show file tree
Hide file tree
Showing 24 changed files with 102 additions and 23 deletions.
4 changes: 2 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ GEM
rack-session (>= 2.0.0, < 3)
tilt (~> 2.0)
smart_properties (1.17.0)
solid_queue (1.0.0)
solid_queue (1.0.1)
activejob (>= 7.1)
activerecord (>= 7.1)
concurrent-ruby (>= 1.3.1)
Expand Down Expand Up @@ -306,7 +306,7 @@ DEPENDENCIES
rubocop-performance
rubocop-rails-omakase
selenium-webdriver
solid_queue (~> 1.0)
solid_queue (~> 1.0.1)
sprockets-rails
sqlite3

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ This library extends Active Job with a querying interface and the following sett
## Adapter Specifics

- **Resque**: Queue pausing is supported only if you have `resque-pause` installed in your project
- **Solid Queue**: Requires version >= 0.9.
- **Solid Queue**: Requires version >= 1.0.1.

## Advanced configuration

Expand Down
2 changes: 1 addition & 1 deletion app/assets/stylesheets/mission_control/jobs/jobs.css
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ table.jobs {
width: 45%;
}
&.scheduled th.job-header {
width: 60%;
width: 40%;
}
&.finished th.job-header {
width: 65%;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def create

private
def jobs_relation
ActiveJob.jobs.failed
ActiveJob.jobs
end

def redirect_location
Expand Down
13 changes: 9 additions & 4 deletions app/controllers/mission_control/jobs/dispatches_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ class MissionControl::Jobs::DispatchesController < MissionControl::Jobs::Applica

def create
@job.dispatch
redirect_to application_jobs_url(@application, :blocked), notice: "Dispatched job with id #{@job.job_id}"
redirect_to redirect_location, notice: "Dispatched job with id #{@job.job_id}"
end

private
def jobs_relation
ApplicationJob.jobs.blocked
end
def jobs_relation
ActiveJob.jobs
end

def redirect_location
status = @job.status.presence_in(supported_job_statuses) || :blocked
application_jobs_url(@application, status)
end
end
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
class MissionControl::Jobs::RecurringTasksController < MissionControl::Jobs::ApplicationController
before_action :ensure_supported_recurring_tasks
before_action :set_recurring_task, only: :show
before_action :set_recurring_task, only: [ :show, :update ]

def index
@recurring_tasks = MissionControl::Jobs::Current.server.recurring_tasks
Expand All @@ -10,6 +10,14 @@ def show
@jobs_page = MissionControl::Jobs::Page.new(@recurring_task.jobs, page: params[:page].to_i)
end

def update
if (job = @recurring_task.enqueue) && job.successfully_enqueued?
redirect_to application_job_path(@application, job.job_id), notice: "Enqueued recurring task #{@recurring_task.id}"
else
redirect_to application_recurring_task_path(@application, @recurring_task), alert: "Something went wrong enqueuing this recurring task"
end
end

private
def ensure_supported_recurring_tasks
unless recurring_tasks_supported?
Expand Down
2 changes: 1 addition & 1 deletion app/helpers/mission_control/jobs/jobs_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def failed_job_backtrace(job, server)
def attribute_names_for_job_status(status)
case status.to_s
when "failed" then [ "Error", "" ]
when "blocked" then [ "Queue", "Blocked by", "Block expiry", "" ]
when "blocked" then [ "Queue", "Blocked by", "" ]
when "finished" then [ "Queue", "Finished" ]
when "scheduled" then [ "Queue", "Scheduled", "" ]
when "in_progress" then [ "Queue", "Run by", "Running since" ]
Expand Down
4 changes: 4 additions & 0 deletions app/models/mission_control/jobs/recurring_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ def jobs
ActiveJob::JobsRelation.new(queue_adapter: queue_adapter).where(recurring_task_id: id)
end

def enqueue
queue_adapter.enqueue_recurring_task(id)
end

private
attr_reader :queue_adapter
end
11 changes: 11 additions & 0 deletions app/views/mission_control/jobs/jobs/_general_information.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@
<%= formatted_time(job.enqueued_at.to_datetime) %>
</td>
</tr>
<% if job.scheduled? %>
<tr>
<th>Scheduled</th>
<td>
<%= formatted_time(job.scheduled_at) %>
<% if job_delayed?(job) %>
<div class="is-danger tag ml-4">delayed</div>
<% end %>
</td>
</tr>
<% end %>
<% if job.failed? %>
<tr>
<th>Failed</th>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
<div class="buttons is-right">
<%= button_to "Dispatch", application_job_dispatch_path(@application, job.job_id), class: "button is-warning is-light mr-0" %>
<%= button_to "Run now", application_job_dispatch_path(@application, job.job_id), class: "button is-warning is-light mr-0" %>
</div>
5 changes: 3 additions & 2 deletions app/views/mission_control/jobs/jobs/blocked/_job.html.erb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<td><%= link_to job.queue_name, application_queue_path(@application, job.queue) %></td>
<td><div class="is-family-monospace is-size-7"><%= job.blocked_by %></div></td>
<td><%= formatted_time(job.blocked_until) %></td>
<td><div class="is-family-monospace is-size-7"><%= job.blocked_by %></div>
<div class="has-text-grey is-size-7">Until <%= formatted_time(job.blocked_until) %></div>
</td>
<td class="pr-0">
<%= render "mission_control/jobs/jobs/blocked/actions", job: job %>
</td>
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<div class="buttons is-right">
<%= button_to "Run now", application_job_dispatch_path(@application, job.job_id), class: "button is-warning is-light mr-0" %>
<%= button_to "Discard", application_job_discard_path(@application, job.job_id), class: "button is-danger is-light mr-0",
form: { data: { turbo_confirm: "This will delete the job and can't be undone. Are you sure?" } } %>
</div>
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<div class="buttons is-right">
<%= button_to "Run now", application_recurring_task_path(@application, recurring_task.id), class: "button is-warning is-light mr-0", method: :put %>
</div>
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@
<td> <%= recurring_task.schedule %> </td>
<td><div class="has-text-grey"><%= recurring_task.last_enqueued_at ? formatted_time(recurring_task.last_enqueued_at) : "Never" %></div></td>
<td class="next_time"><div class="has-text-grey"><%= formatted_time(recurring_task.next_time) %></div></td>
<td class="pr-0">
<%= render "mission_control/jobs/recurring_tasks/actions", recurring_task: recurring_task %>
</td>
</tr>
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
<div class="level-left">
<%= recurring_task.id %>
</div>
<div class="level-right">
<%= render "mission_control/jobs/recurring_tasks/actions", recurring_task: recurring_task %>
</div>
</div>
</h1>
5 changes: 3 additions & 2 deletions app/views/mission_control/jobs/recurring_tasks/index.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
<th></th>
<th>Job</th>
<th>Schedule</th>
<th>Last enqueued at</th>
<th>Next run</th>
<th>Last enqueued</th>
<th>Next</th>
<th></th>
</tr>
</thead>

Expand Down
2 changes: 1 addition & 1 deletion config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
resources :jobs, only: :index, path: ":status/jobs"

resources :workers, only: [ :index, :show ]
resources :recurring_tasks, only: [ :index, :show ]
resources :recurring_tasks, only: [ :index, :show, :update ]
end

# Allow referencing urls without providing an application_id. It will default to the first one.
Expand Down
2 changes: 1 addition & 1 deletion lib/active_job/executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def discard
end

def dispatch
ActiveJob.jobs.blocked.dispatch_job(self)
ActiveJob.jobs.dispatch_job(self)
end

private
Expand Down
5 changes: 5 additions & 0 deletions lib/active_job/jobs_relation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ def discard_job(job)
end

# Dispatch the provided job.
#
# This operation is only valid for blocked or scheduled jobs. It will
# raise an error +ActiveJob::Errors::InvalidOperation+ otherwise.
def dispatch_job(job)
raise ActiveJob::Errors::InvalidOperation, "This operation can only be performed on blocked or scheduled jobs, but this job is #{job.status}" unless job.blocked? || job.scheduled?

queue_adapter.dispatch_job(job, self)
end

Expand Down
10 changes: 7 additions & 3 deletions lib/active_job/queue_adapters/solid_queue_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,13 @@ def execution_error_from_solid_queue_job(solid_queue_job)
end

def dispatch_immediately(job)
SolidQueue::Job.transaction do
job.dispatch_bypassing_concurrency_limits
job.blocked_execution.destroy!
if job.blocked?
SolidQueue::Job.transaction do
job.dispatch_bypassing_concurrency_limits
job.blocked_execution.destroy!
end
else
job.scheduled_execution.update!(scheduled_at: Time.now)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ def find_recurring_task(task_id)
end
end

def enqueue_recurring_task(task_id)
if task = SolidQueue::RecurringTask.find_by(key: task_id)
task.enqueue(at: Time.now)
end
end

private
def recurring_task_attributes_from_solid_queue_recurring_task(task)
{
Expand Down
2 changes: 1 addition & 1 deletion mission_control-jobs.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "irb", "~> 1.13"

spec.add_development_dependency "resque"
spec.add_development_dependency "solid_queue", "~> 1.0"
spec.add_development_dependency "solid_queue", "~> 1.0.1"
spec.add_development_dependency "selenium-webdriver"
spec.add_development_dependency "resque-pause"
spec.add_development_dependency "mocha"
Expand Down
13 changes: 13 additions & 0 deletions test/controllers/recurring_tasks_controller_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,17 @@ class MissionControl::Jobs::RecurringTasksControllerTest < ActionDispatch::Integ
assert_select "article.is-danger", /Recurring task with id 'invalid_key' not found/
end
end

test "enqueue recurring task successfully" do
schedule_recurring_tasks_async(wait: 0.1.seconds)

assert_difference -> { ActiveJob.jobs.pending.count } do
put mission_control_jobs.application_recurring_task_url(@application, "periodic_pause_job")
assert_response :redirect
end

job = ActiveJob.jobs.pending.last
assert_equal "PauseJob", job.job_class_name
assert_match /jobs\/#{job.job_id}\?server_id=solid_queue\z/, response.location
end
end
13 changes: 12 additions & 1 deletion test/dummy/db/seeds.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def clean_database
end

class JobsLoader
attr_reader :application, :server, :failed_jobs_count, :pending_jobs_count, :finished_jobs_count, :blocked_jobs_count
attr_reader :application, :server, :failed_jobs_count, :pending_jobs_count, :finished_jobs_count, :blocked_jobs_count, :scheduled_jobs_count

def initialize(application, server, failed_jobs_count: 100, pending_jobs_count: 50)
@application = application
Expand All @@ -19,6 +19,7 @@ def initialize(application, server, failed_jobs_count: 100, pending_jobs_count:
@pending_jobs_count = randomize(pending_jobs_count)
@finished_jobs_count = randomize(pending_jobs_count)
@blocked_jobs_count = randomize(pending_jobs_count)
@scheduled_jobs_count = randomize(pending_jobs_count)
end

def load
Expand All @@ -27,6 +28,7 @@ def load
load_failed_jobs
load_pending_jobs
load_blocked_jobs
load_scheduled_jobs
load_recurring_tasks
end
end
Expand Down Expand Up @@ -64,6 +66,15 @@ def load_blocked_jobs
end
end

def load_scheduled_jobs
return unless supported_status?(:scheduled)

puts "Generating #{scheduled_jobs_count} scheduled jobs for #{application} - #{server}..."
scheduled_jobs_count.times do |index|
DummyJob.set(wait: randomize(60).minutes).perform_later(index)
end
end

def load_recurring_tasks
return unless server.queue_adapter.supports_recurring_tasks?

Expand Down

0 comments on commit d36e5df

Please sign in to comment.