Skip to content

Commit

Permalink
Add dynamic job priorities
Browse files Browse the repository at this point in the history
- DB migration to add user_guid field to jobs table
- When job is enqueued, get currently active jobs of that user and add +1 to job priority for each active job

Use original priority for reoccuring jobs

- Pass on priority of current delayed job to next delayed job in reoccurring job
- Don't overwrite priority, which was passed into Enqueuer, with base priority

Add unit tests

Use concurrently for index operations. Small refactoring.

Add config parameter for enabling dynamic job priorities

Add migration tests and fix migration

Add test to enqueuer_spec

Use index name when dropping an index
  • Loading branch information
svkrieger committed Apr 17, 2024
1 parent ac514a2 commit 9df4f89
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 5 deletions.
2 changes: 1 addition & 1 deletion app/jobs/enqueuer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def enqueue_job(job)
request_id = ::VCAP::Request.current_id
timeout_job = TimeoutJob.new(job, job_timeout)
logging_context_job = LoggingContextJob.new(timeout_job, request_id)
@opts[:priority] = job_priority unless job_priority.nil?
@opts[:priority] = job_priority unless @opts[:priority] || job_priority.nil?
Delayed::Job.enqueue(logging_context_job, @opts)
end

Expand Down
9 changes: 8 additions & 1 deletion app/jobs/pollable_job_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@ def before_enqueue(job)
resource_type: @handler.resource_type
)
else
user_guid = VCAP::CloudController::UserAuditInfo.from_context(VCAP::CloudController::SecurityContext).user_guid

if VCAP::CloudController::Config.config.get(:jobs, :enable_dynamic_job_priorities) && user_guid
job.values[:priority] += PollableJobModel.number_of_active_jobs_by_user(user_guid)
end

PollableJobModel.create(
delayed_job_guid: job.guid,
state: PollableJobModel::PROCESSING_STATE,
operation: @handler.display_name,
resource_guid: @handler.resource_guid,
resource_type: @handler.resource_type
resource_type: @handler.resource_type,
user_guid: user_guid
)
end
end
Expand Down
7 changes: 4 additions & 3 deletions app/jobs/reoccurring_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def success(current_delayed_job)
elsif next_enqueue_would_exceed_maximum_duration?
expire!
else
enqueue_next_job(pollable_job)
enqueue_next_job(pollable_job, current_delayed_job.priority)
end
end

Expand Down Expand Up @@ -75,10 +75,11 @@ def expire!
raise CloudController::Errors::ApiError.new_from_details('JobTimeout')
end

def enqueue_next_job(pollable_job)
def enqueue_next_job(pollable_job, priority)
opts = {
queue: Jobs::Queues.generic,
run_at: Delayed::Job.db_time_now + next_execution_in
run_at: Delayed::Job.db_time_now + next_execution_in,
priority: priority
}

@retry_number += 1
Expand Down
4 changes: 4 additions & 0 deletions app/models/runtime/pollable_job_model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,9 @@ def self.find_by_delayed_job_guid(delayed_job_guid)

pollable_job
end

def self.number_of_active_jobs_by_user(user_guid)
PollableJobModel.where(state: %w[PROCESSING POLLING], user_guid: user_guid).count
end
end
end
39 changes: 39 additions & 0 deletions db/migrations/20240314131908_add_user_guid_to_jobs_table.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
Sequel.migration do
# adding an index concurrently cannot be done within a transaction
no_transaction

up do
if database_type == :postgres
alter_table :jobs do
add_column :user_guid, String, size: 255, if_not_exists: true
add_index :user_guid, name: :jobs_user_guid_index, if_not_exists: true, concurrently: true
end

elsif database_type == :mysql
alter_table :jobs do
add_column :user_guid, String, size: 255 unless @db.schema(:jobs).map(&:first).include?(:user_guid)
# rubocop:disable Sequel/ConcurrentIndex
add_index :user_guid, name: :jobs_user_guid_index unless @db.indexes(:jobs).include?(:jobs_user_guid_index)
# rubocop:enable Sequel/ConcurrentIndex
end
end
end

down do
if database_type == :postgres
alter_table :jobs do
drop_index :user_guid, name: :jobs_user_guid_index, if_exists: true, concurrently: true
drop_column :user_guid, if_exists: true
end
end

if database_type == :mysql
alter_table :jobs do
# rubocop:disable Sequel/ConcurrentIndex
drop_index :user_guid, name: :jobs_user_guid_index if @db.indexes(:jobs).include?(:jobs_user_guid_index)
# rubocop:enable Sequel/ConcurrentIndex
drop_column :user_guid if @db.schema(:jobs).map(&:first).include?(:user_guid)
end
end
end
end
1 change: 1 addition & 0 deletions lib/cloud_controller/config_schemas/base/api_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ class ApiSchema < VCAP::Config

jobs: {
global: { timeout_in_seconds: Integer },
optional(:enable_dynamic_job_priorities) => bool,
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
optional(:diego_sync) => { timeout_in_seconds: Integer },
Expand Down
1 change: 1 addition & 0 deletions lib/cloud_controller/config_schemas/base/worker_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class WorkerSchema < VCAP::Config

jobs: {
global: { timeout_in_seconds: Integer },
optional(:enable_dynamic_job_priorities) => bool,
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
optional(:diego_sync) => { timeout_in_seconds: Integer },
Expand Down
90 changes: 90 additions & 0 deletions spec/migrations/20240314131908_add_user_guid_to_jobs_table_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
require 'spec_helper'
require 'migrations/helpers/migration_shared_context'

RSpec.describe 'migration to add user_guid column to jobs table and add an index for that column', isolation: :truncation do
include_context 'migration' do
let(:migration_filename) { '20240314131908_add_user_guid_to_jobs_table.rb' }
end

describe 'jobs table' do
it 'adds a column `user_guid`' do
expect(db[:jobs].columns).not_to include(:user_guid)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
expect(db[:jobs].columns).to include(:user_guid)
end

it 'adds an index on the user_guid column' do
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
end

describe 'idempotency of up' do
context '`user_guid` column already exists' do
before do
db.add_column :jobs, :user_guid, String, size: 255
end

it 'does not fail' do
expect(db[:jobs].columns).to include(:user_guid)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
end

it 'continues to create the index' do
expect(db[:jobs].columns).to include(:user_guid)
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
end
end

context 'index already exists' do
before do
db.add_column :jobs, :user_guid, String, size: 255
db.add_index :jobs, :user_guid, name: :jobs_user_guid_index
end

it 'does not fail' do
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
end
end
end

describe 'idempotency of down' do
context 'index does not exist' do
before do
Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true)
db.drop_index :jobs, :user_guid, name: :jobs_user_guid_index
end

it 'does not fail' do
expect(db[:jobs].columns).to include(:user_guid)
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
end

it 'continues to remove the `user_guid` column' do
expect(db[:jobs].columns).to include(:user_guid)
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
expect(db[:jobs].columns).not_to include(:user_guid)
end
end

context 'index and column do not exist' do
before do
Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true)
db.drop_index :jobs, :user_guid, name: :jobs_user_guid_index
db.drop_column :jobs, :user_guid
end

it 'does not fail' do
expect(db[:jobs].columns).not_to include(:user_guid)
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
end
end
end
end
end
22 changes: 22 additions & 0 deletions spec/unit/jobs/enqueuer_spec.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'spec_helper'
require 'db_spec_helper'
require 'jobs/enqueuer'
require 'jobs/delete_action_job'
Expand Down Expand Up @@ -133,6 +134,15 @@ module VCAP::CloudController::Jobs
end
end

it 'uses the default priority' do
original_enqueue = Delayed::Job.method(:enqueue)
expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, opts|
expect(opts).not_to include(:priority)
original_enqueue.call(enqueued_job, opts)
end
Enqueuer.new(wrapped_job, opts).enqueue_pollable
end

context 'priority from config' do
let(:priorities) { { priorities: { wrapped_job.display_name.to_sym => 1899 } } }

Expand All @@ -144,6 +154,18 @@ module VCAP::CloudController::Jobs
end
Enqueuer.new(wrapped_job, opts).enqueue_pollable
end

context 'and priority from Enqueuer (e.g. from reoccurring jobs)' do
it 'uses the priority passed into the Enqueuer' do
original_enqueue = Delayed::Job.method(:enqueue)
expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, opts|
expect(opts).to include({ priority: 2000 })
original_enqueue.call(enqueued_job, opts)
end
opts[:priority] = 2000
Enqueuer.new(wrapped_job, opts).enqueue_pollable
end
end
end
end

Expand Down
94 changes: 94 additions & 0 deletions spec/unit/jobs/pollable_job_wrapper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,76 @@ class BigException < StandardError
expect(job_record.reload.state).to eq('COMPLETE')
end

context 'when dynamic job priorities are enabled' do
before do
TestConfig.config[:jobs][:enable_dynamic_job_priorities] = true
end

context 'when there are several active jobs for the current user' do
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }
let(:pollable_job) { PollableJobWrapper.new(job) }

before do
stub_const('VCAP::CloudController::SecurityContext', security_context)
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
end

it 'adds +1 to base priority for each active job' do
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }

enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
expect(enqueued_job.priority).to eq(23)
end

context 'when SecurityContext does not have a user guid' do
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({}) }) }

before do
stub_const('VCAP::CloudController::SecurityContext', security_context)
end

it 'does not change its delayed job\'s base priority' do
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }

enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
expect(enqueued_job.priority).to eq(20)
end

it 'uses nil for the user_guid' do
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }

enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue

job_record = VCAP::CloudController::PollableJobModel.find(delayed_job_guid: enqueued_job.guid)
expect(job_record.user_guid).to be_nil
end
end
end
end

context 'when dynamic job priorities are disabled' do
context 'when there are several active jobs for the current user' do
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }
let(:pollable_job) { PollableJobWrapper.new(job) }

before do
stub_const('VCAP::CloudController::SecurityContext', security_context)
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
end

it 'does not change its delayed job\'s base priority' do
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }

enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
expect(enqueued_job.priority).to eq(20)
end
end
end

context 'reusing a pollable job' do
let!(:existing) { VCAP::CloudController::PollableJobModel.make }
let(:pollable_job) { PollableJobWrapper.new(job, existing_guid: existing.guid) }
Expand All @@ -60,6 +130,30 @@ class BigException < StandardError
expect(job_record.cf_api_error).to be_nil
end

context 'when dynamic job priorities are enabled' do
before do
TestConfig.config[:jobs][:enable_dynamic_job_priorities] = true
end

context 'when there are several active jobs for the current user' do
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }

before do
stub_const('VCAP::CloudController::SecurityContext', security_context)
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
end

it 'does not change its delayed job\'s base priority' do
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }

enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
expect(enqueued_job.priority).to eq(20)
end
end
end

context 'when the job defines its state' do
before do
job.define_singleton_method(:pollable_job_state) do
Expand Down
12 changes: 12 additions & 0 deletions spec/unit/jobs/reoccurring_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ def perform
expect(PollableJobModel.first.delayed_job_guid).not_to eq(pollable_job.delayed_job_guid)
end

it 'keeps the delayed job\'s priority when re-enqueuing' do
TestConfig.config[:jobs][:priorities] = { 'fake-job': 20 }

pollable_job = Jobs::Enqueuer.new(FakeJob.new, { queue: Jobs::Queues.generic, priority: 22 }).enqueue_pollable
expect(Delayed::Job.where(guid: PollableJobModel.first.delayed_job_guid).first[:priority]).to eq(22)

execute_all_jobs(expected_successes: 1, expected_failures: 0, jobs_to_execute: 1)

expect(Delayed::Job.where(guid: PollableJobModel.first.delayed_job_guid).first[:priority]).to eq(22)
expect(PollableJobModel.first.delayed_job_guid).not_to eq(pollable_job.delayed_job_guid)
end

it 'waits for the polling interval' do
job = FakeJob.new
job.polling_interval_seconds = 95
Expand Down
Loading

0 comments on commit 9df4f89

Please sign in to comment.