Skip to content

Commit

Permalink
Merge pull request cloudfoundry#3703 from sap-contributions/add-dynam…
Browse files Browse the repository at this point in the history
…ic-job-prios

Add dynamic job priorities
  • Loading branch information
svkrieger authored Apr 18, 2024
2 parents f27e6b5 + 9df4f89 commit 3eaad82
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 5 deletions.
2 changes: 1 addition & 1 deletion app/jobs/enqueuer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def enqueue_job(job)
request_id = ::VCAP::Request.current_id
timeout_job = TimeoutJob.new(job, job_timeout)
logging_context_job = LoggingContextJob.new(timeout_job, request_id)
@opts[:priority] = job_priority unless job_priority.nil?
@opts[:priority] = job_priority unless @opts[:priority] || job_priority.nil?
Delayed::Job.enqueue(logging_context_job, @opts)
end

Expand Down
9 changes: 8 additions & 1 deletion app/jobs/pollable_job_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@ def before_enqueue(job)
resource_type: @handler.resource_type
)
else
user_guid = VCAP::CloudController::UserAuditInfo.from_context(VCAP::CloudController::SecurityContext).user_guid

if VCAP::CloudController::Config.config.get(:jobs, :enable_dynamic_job_priorities) && user_guid
job.values[:priority] += PollableJobModel.number_of_active_jobs_by_user(user_guid)
end

PollableJobModel.create(
delayed_job_guid: job.guid,
state: PollableJobModel::PROCESSING_STATE,
operation: @handler.display_name,
resource_guid: @handler.resource_guid,
resource_type: @handler.resource_type
resource_type: @handler.resource_type,
user_guid: user_guid
)
end
end
Expand Down
7 changes: 4 additions & 3 deletions app/jobs/reoccurring_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def success(current_delayed_job)
elsif next_enqueue_would_exceed_maximum_duration?
expire!
else
enqueue_next_job(pollable_job)
enqueue_next_job(pollable_job, current_delayed_job.priority)
end
end

Expand Down Expand Up @@ -75,10 +75,11 @@ def expire!
raise CloudController::Errors::ApiError.new_from_details('JobTimeout')
end

def enqueue_next_job(pollable_job)
def enqueue_next_job(pollable_job, priority)
opts = {
queue: Jobs::Queues.generic,
run_at: Delayed::Job.db_time_now + next_execution_in
run_at: Delayed::Job.db_time_now + next_execution_in,
priority: priority
}

@retry_number += 1
Expand Down
4 changes: 4 additions & 0 deletions app/models/runtime/pollable_job_model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,9 @@ def self.find_by_delayed_job_guid(delayed_job_guid)

pollable_job
end

def self.number_of_active_jobs_by_user(user_guid)
PollableJobModel.where(state: %w[PROCESSING POLLING], user_guid: user_guid).count
end
end
end
39 changes: 39 additions & 0 deletions db/migrations/20240314131908_add_user_guid_to_jobs_table.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
Sequel.migration do
# adding an index concurrently cannot be done within a transaction
no_transaction

up do
if database_type == :postgres
alter_table :jobs do
add_column :user_guid, String, size: 255, if_not_exists: true
add_index :user_guid, name: :jobs_user_guid_index, if_not_exists: true, concurrently: true
end

elsif database_type == :mysql
alter_table :jobs do
add_column :user_guid, String, size: 255 unless @db.schema(:jobs).map(&:first).include?(:user_guid)
# rubocop:disable Sequel/ConcurrentIndex
add_index :user_guid, name: :jobs_user_guid_index unless @db.indexes(:jobs).include?(:jobs_user_guid_index)
# rubocop:enable Sequel/ConcurrentIndex
end
end
end

down do
if database_type == :postgres
alter_table :jobs do
drop_index :user_guid, name: :jobs_user_guid_index, if_exists: true, concurrently: true
drop_column :user_guid, if_exists: true
end
end

if database_type == :mysql
alter_table :jobs do
# rubocop:disable Sequel/ConcurrentIndex
drop_index :user_guid, name: :jobs_user_guid_index if @db.indexes(:jobs).include?(:jobs_user_guid_index)
# rubocop:enable Sequel/ConcurrentIndex
drop_column :user_guid if @db.schema(:jobs).map(&:first).include?(:user_guid)
end
end
end
end
1 change: 1 addition & 0 deletions lib/cloud_controller/config_schemas/base/api_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ class ApiSchema < VCAP::Config

jobs: {
global: { timeout_in_seconds: Integer },
optional(:enable_dynamic_job_priorities) => bool,
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
optional(:diego_sync) => { timeout_in_seconds: Integer },
Expand Down
1 change: 1 addition & 0 deletions lib/cloud_controller/config_schemas/base/worker_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class WorkerSchema < VCAP::Config

jobs: {
global: { timeout_in_seconds: Integer },
optional(:enable_dynamic_job_priorities) => bool,
optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer },
optional(:blobstore_delete) => { timeout_in_seconds: Integer },
optional(:diego_sync) => { timeout_in_seconds: Integer },
Expand Down
90 changes: 90 additions & 0 deletions spec/migrations/20240314131908_add_user_guid_to_jobs_table_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
require 'spec_helper'
require 'migrations/helpers/migration_shared_context'

RSpec.describe 'migration to add user_guid column to jobs table and add an index for that column', isolation: :truncation do
include_context 'migration' do
let(:migration_filename) { '20240314131908_add_user_guid_to_jobs_table.rb' }
end

describe 'jobs table' do
it 'adds a column `user_guid`' do
expect(db[:jobs].columns).not_to include(:user_guid)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
expect(db[:jobs].columns).to include(:user_guid)
end

it 'adds an index on the user_guid column' do
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
end

describe 'idempotency of up' do
context '`user_guid` column already exists' do
before do
db.add_column :jobs, :user_guid, String, size: 255
end

it 'does not fail' do
expect(db[:jobs].columns).to include(:user_guid)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
end

it 'continues to create the index' do
expect(db[:jobs].columns).to include(:user_guid)
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
end
end

context 'index already exists' do
before do
db.add_column :jobs, :user_guid, String, size: 255
db.add_index :jobs, :user_guid, name: :jobs_user_guid_index
end

it 'does not fail' do
expect(db.indexes(:jobs)).to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error
end
end
end

describe 'idempotency of down' do
context 'index does not exist' do
before do
Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true)
db.drop_index :jobs, :user_guid, name: :jobs_user_guid_index
end

it 'does not fail' do
expect(db[:jobs].columns).to include(:user_guid)
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
end

it 'continues to remove the `user_guid` column' do
expect(db[:jobs].columns).to include(:user_guid)
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
expect(db[:jobs].columns).not_to include(:user_guid)
end
end

context 'index and column do not exist' do
before do
Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true)
db.drop_index :jobs, :user_guid, name: :jobs_user_guid_index
db.drop_column :jobs, :user_guid
end

it 'does not fail' do
expect(db[:jobs].columns).not_to include(:user_guid)
expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index)
expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error
end
end
end
end
end
22 changes: 22 additions & 0 deletions spec/unit/jobs/enqueuer_spec.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'spec_helper'
require 'db_spec_helper'
require 'jobs/enqueuer'
require 'jobs/delete_action_job'
Expand Down Expand Up @@ -133,6 +134,15 @@ module VCAP::CloudController::Jobs
end
end

it 'uses the default priority' do
original_enqueue = Delayed::Job.method(:enqueue)
expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, opts|
expect(opts).not_to include(:priority)
original_enqueue.call(enqueued_job, opts)
end
Enqueuer.new(wrapped_job, opts).enqueue_pollable
end

context 'priority from config' do
let(:priorities) { { priorities: { wrapped_job.display_name.to_sym => 1899 } } }

Expand All @@ -144,6 +154,18 @@ module VCAP::CloudController::Jobs
end
Enqueuer.new(wrapped_job, opts).enqueue_pollable
end

context 'and priority from Enqueuer (e.g. from reoccurring jobs)' do
it 'uses the priority passed into the Enqueuer' do
original_enqueue = Delayed::Job.method(:enqueue)
expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, opts|
expect(opts).to include({ priority: 2000 })
original_enqueue.call(enqueued_job, opts)
end
opts[:priority] = 2000
Enqueuer.new(wrapped_job, opts).enqueue_pollable
end
end
end
end

Expand Down
94 changes: 94 additions & 0 deletions spec/unit/jobs/pollable_job_wrapper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,76 @@ class BigException < StandardError
expect(job_record.reload.state).to eq('COMPLETE')
end

context 'when dynamic job priorities are enabled' do
before do
TestConfig.config[:jobs][:enable_dynamic_job_priorities] = true
end

context 'when there are several active jobs for the current user' do
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }
let(:pollable_job) { PollableJobWrapper.new(job) }

before do
stub_const('VCAP::CloudController::SecurityContext', security_context)
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
end

it 'adds +1 to base priority for each active job' do
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }

enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
expect(enqueued_job.priority).to eq(23)
end

context 'when SecurityContext does not have a user guid' do
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({}) }) }

before do
stub_const('VCAP::CloudController::SecurityContext', security_context)
end

it 'does not change its delayed job\'s base priority' do
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }

enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
expect(enqueued_job.priority).to eq(20)
end

it 'uses nil for the user_guid' do
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }

enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue

job_record = VCAP::CloudController::PollableJobModel.find(delayed_job_guid: enqueued_job.guid)
expect(job_record.user_guid).to be_nil
end
end
end
end

context 'when dynamic job priorities are disabled' do
context 'when there are several active jobs for the current user' do
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }
let(:pollable_job) { PollableJobWrapper.new(job) }

before do
stub_const('VCAP::CloudController::SecurityContext', security_context)
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
end

it 'does not change its delayed job\'s base priority' do
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }

enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
expect(enqueued_job.priority).to eq(20)
end
end
end

context 'reusing a pollable job' do
let!(:existing) { VCAP::CloudController::PollableJobModel.make }
let(:pollable_job) { PollableJobWrapper.new(job, existing_guid: existing.guid) }
Expand All @@ -60,6 +130,30 @@ class BigException < StandardError
expect(job_record.cf_api_error).to be_nil
end

context 'when dynamic job priorities are enabled' do
before do
TestConfig.config[:jobs][:enable_dynamic_job_priorities] = true
end

context 'when there are several active jobs for the current user' do
let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) }

before do
stub_const('VCAP::CloudController::SecurityContext', security_context)
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue
end

it 'does not change its delayed job\'s base priority' do
TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 }

enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue
expect(enqueued_job.priority).to eq(20)
end
end
end

context 'when the job defines its state' do
before do
job.define_singleton_method(:pollable_job_state) do
Expand Down
12 changes: 12 additions & 0 deletions spec/unit/jobs/reoccurring_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ def perform
expect(PollableJobModel.first.delayed_job_guid).not_to eq(pollable_job.delayed_job_guid)
end

it 'keeps the delayed job\'s priority when re-enqueuing' do
TestConfig.config[:jobs][:priorities] = { 'fake-job': 20 }

pollable_job = Jobs::Enqueuer.new(FakeJob.new, { queue: Jobs::Queues.generic, priority: 22 }).enqueue_pollable
expect(Delayed::Job.where(guid: PollableJobModel.first.delayed_job_guid).first[:priority]).to eq(22)

execute_all_jobs(expected_successes: 1, expected_failures: 0, jobs_to_execute: 1)

expect(Delayed::Job.where(guid: PollableJobModel.first.delayed_job_guid).first[:priority]).to eq(22)
expect(PollableJobModel.first.delayed_job_guid).not_to eq(pollable_job.delayed_job_guid)
end

it 'waits for the polling interval' do
job = FakeJob.new
job.polling_interval_seconds = 95
Expand Down
Loading

0 comments on commit 3eaad82

Please sign in to comment.