From 9df4f8981400d2f4a31be1cb1bddd33a5b08856b Mon Sep 17 00:00:00 2001 From: Sven Krieger <37476281+svkrieger@users.noreply.github.com> Date: Mon, 18 Mar 2024 08:09:05 +0100 Subject: [PATCH] Add dynamic job priorities - DB migration to add user_guid field to jobs table - When job is enqueued, get currently active jobs of that user and add +1 to job priority for each active job Use original priority for reoccuring jobs - Pass on priority of current delayed job to next delayed job in reoccurring job - Don't overwrite priority, which was passed into Enqueuer, with base priority Add unit tests Use concurrently for index operations. Small refactoring. Add config parameter for enabling dynamic job priorities Add migration tests and fix migration Add test to enqueuer_spec Use index name when dropping an index --- app/jobs/enqueuer.rb | 2 +- app/jobs/pollable_job_wrapper.rb | 9 +- app/jobs/reoccurring_job.rb | 7 +- app/models/runtime/pollable_job_model.rb | 4 + ...40314131908_add_user_guid_to_jobs_table.rb | 39 ++++++++ .../config_schemas/base/api_schema.rb | 1 + .../config_schemas/base/worker_schema.rb | 1 + ...131908_add_user_guid_to_jobs_table_spec.rb | 90 ++++++++++++++++++ spec/unit/jobs/enqueuer_spec.rb | 22 +++++ spec/unit/jobs/pollable_job_wrapper_spec.rb | 94 +++++++++++++++++++ spec/unit/jobs/reoccurring_job_spec.rb | 12 +++ .../models/runtime/pollable_job_model_spec.rb | 30 ++++++ 12 files changed, 306 insertions(+), 5 deletions(-) create mode 100644 db/migrations/20240314131908_add_user_guid_to_jobs_table.rb create mode 100644 spec/migrations/20240314131908_add_user_guid_to_jobs_table_spec.rb diff --git a/app/jobs/enqueuer.rb b/app/jobs/enqueuer.rb index c6af52a4fd9..1d3ac32aca7 100644 --- a/app/jobs/enqueuer.rb +++ b/app/jobs/enqueuer.rb @@ -43,7 +43,7 @@ def enqueue_job(job) request_id = ::VCAP::Request.current_id timeout_job = TimeoutJob.new(job, job_timeout) logging_context_job = LoggingContextJob.new(timeout_job, request_id) - @opts[:priority] = job_priority unless job_priority.nil? + @opts[:priority] = job_priority unless @opts[:priority] || job_priority.nil? Delayed::Job.enqueue(logging_context_job, @opts) end diff --git a/app/jobs/pollable_job_wrapper.rb b/app/jobs/pollable_job_wrapper.rb index 5232ca3d7dc..eb1fe78fdab 100644 --- a/app/jobs/pollable_job_wrapper.rb +++ b/app/jobs/pollable_job_wrapper.rb @@ -24,12 +24,19 @@ def before_enqueue(job) resource_type: @handler.resource_type ) else + user_guid = VCAP::CloudController::UserAuditInfo.from_context(VCAP::CloudController::SecurityContext).user_guid + + if VCAP::CloudController::Config.config.get(:jobs, :enable_dynamic_job_priorities) && user_guid + job.values[:priority] += PollableJobModel.number_of_active_jobs_by_user(user_guid) + end + PollableJobModel.create( delayed_job_guid: job.guid, state: PollableJobModel::PROCESSING_STATE, operation: @handler.display_name, resource_guid: @handler.resource_guid, - resource_type: @handler.resource_type + resource_type: @handler.resource_type, + user_guid: user_guid ) end end diff --git a/app/jobs/reoccurring_job.rb b/app/jobs/reoccurring_job.rb index cf29f7a4e01..4f375a964bf 100644 --- a/app/jobs/reoccurring_job.rb +++ b/app/jobs/reoccurring_job.rb @@ -13,7 +13,7 @@ def success(current_delayed_job) elsif next_enqueue_would_exceed_maximum_duration? expire! else - enqueue_next_job(pollable_job) + enqueue_next_job(pollable_job, current_delayed_job.priority) end end @@ -75,10 +75,11 @@ def expire! raise CloudController::Errors::ApiError.new_from_details('JobTimeout') end - def enqueue_next_job(pollable_job) + def enqueue_next_job(pollable_job, priority) opts = { queue: Jobs::Queues.generic, - run_at: Delayed::Job.db_time_now + next_execution_in + run_at: Delayed::Job.db_time_now + next_execution_in, + priority: priority } @retry_number += 1 diff --git a/app/models/runtime/pollable_job_model.rb b/app/models/runtime/pollable_job_model.rb index da7b86d46cb..29dbf4a44aa 100644 --- a/app/models/runtime/pollable_job_model.rb +++ b/app/models/runtime/pollable_job_model.rb @@ -46,5 +46,9 @@ def self.find_by_delayed_job_guid(delayed_job_guid) pollable_job end + + def self.number_of_active_jobs_by_user(user_guid) + PollableJobModel.where(state: %w[PROCESSING POLLING], user_guid: user_guid).count + end end end diff --git a/db/migrations/20240314131908_add_user_guid_to_jobs_table.rb b/db/migrations/20240314131908_add_user_guid_to_jobs_table.rb new file mode 100644 index 00000000000..70ab03668e2 --- /dev/null +++ b/db/migrations/20240314131908_add_user_guid_to_jobs_table.rb @@ -0,0 +1,39 @@ +Sequel.migration do + # adding an index concurrently cannot be done within a transaction + no_transaction + + up do + if database_type == :postgres + alter_table :jobs do + add_column :user_guid, String, size: 255, if_not_exists: true + add_index :user_guid, name: :jobs_user_guid_index, if_not_exists: true, concurrently: true + end + + elsif database_type == :mysql + alter_table :jobs do + add_column :user_guid, String, size: 255 unless @db.schema(:jobs).map(&:first).include?(:user_guid) + # rubocop:disable Sequel/ConcurrentIndex + add_index :user_guid, name: :jobs_user_guid_index unless @db.indexes(:jobs).include?(:jobs_user_guid_index) + # rubocop:enable Sequel/ConcurrentIndex + end + end + end + + down do + if database_type == :postgres + alter_table :jobs do + drop_index :user_guid, name: :jobs_user_guid_index, if_exists: true, concurrently: true + drop_column :user_guid, if_exists: true + end + end + + if database_type == :mysql + alter_table :jobs do + # rubocop:disable Sequel/ConcurrentIndex + drop_index :user_guid, name: :jobs_user_guid_index if @db.indexes(:jobs).include?(:jobs_user_guid_index) + # rubocop:enable Sequel/ConcurrentIndex + drop_column :user_guid if @db.schema(:jobs).map(&:first).include?(:user_guid) + end + end + end +end diff --git a/lib/cloud_controller/config_schemas/base/api_schema.rb b/lib/cloud_controller/config_schemas/base/api_schema.rb index 2b74d0bc6e7..32f32bfd2f3 100644 --- a/lib/cloud_controller/config_schemas/base/api_schema.rb +++ b/lib/cloud_controller/config_schemas/base/api_schema.rb @@ -335,6 +335,7 @@ class ApiSchema < VCAP::Config jobs: { global: { timeout_in_seconds: Integer }, + optional(:enable_dynamic_job_priorities) => bool, optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer }, optional(:blobstore_delete) => { timeout_in_seconds: Integer }, optional(:diego_sync) => { timeout_in_seconds: Integer }, diff --git a/lib/cloud_controller/config_schemas/base/worker_schema.rb b/lib/cloud_controller/config_schemas/base/worker_schema.rb index 77b9c502e9a..224448bbfde 100644 --- a/lib/cloud_controller/config_schemas/base/worker_schema.rb +++ b/lib/cloud_controller/config_schemas/base/worker_schema.rb @@ -167,6 +167,7 @@ class WorkerSchema < VCAP::Config jobs: { global: { timeout_in_seconds: Integer }, + optional(:enable_dynamic_job_priorities) => bool, optional(:app_usage_events_cleanup) => { timeout_in_seconds: Integer }, optional(:blobstore_delete) => { timeout_in_seconds: Integer }, optional(:diego_sync) => { timeout_in_seconds: Integer }, diff --git a/spec/migrations/20240314131908_add_user_guid_to_jobs_table_spec.rb b/spec/migrations/20240314131908_add_user_guid_to_jobs_table_spec.rb new file mode 100644 index 00000000000..a4d2c8d87c3 --- /dev/null +++ b/spec/migrations/20240314131908_add_user_guid_to_jobs_table_spec.rb @@ -0,0 +1,90 @@ +require 'spec_helper' +require 'migrations/helpers/migration_shared_context' + +RSpec.describe 'migration to add user_guid column to jobs table and add an index for that column', isolation: :truncation do + include_context 'migration' do + let(:migration_filename) { '20240314131908_add_user_guid_to_jobs_table.rb' } + end + + describe 'jobs table' do + it 'adds a column `user_guid`' do + expect(db[:jobs].columns).not_to include(:user_guid) + expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error + expect(db[:jobs].columns).to include(:user_guid) + end + + it 'adds an index on the user_guid column' do + expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index) + expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error + expect(db.indexes(:jobs)).to include(:jobs_user_guid_index) + end + + describe 'idempotency of up' do + context '`user_guid` column already exists' do + before do + db.add_column :jobs, :user_guid, String, size: 255 + end + + it 'does not fail' do + expect(db[:jobs].columns).to include(:user_guid) + expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error + end + + it 'continues to create the index' do + expect(db[:jobs].columns).to include(:user_guid) + expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index) + expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error + expect(db.indexes(:jobs)).to include(:jobs_user_guid_index) + end + end + + context 'index already exists' do + before do + db.add_column :jobs, :user_guid, String, size: 255 + db.add_index :jobs, :user_guid, name: :jobs_user_guid_index + end + + it 'does not fail' do + expect(db.indexes(:jobs)).to include(:jobs_user_guid_index) + expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) }.not_to raise_error + end + end + end + + describe 'idempotency of down' do + context 'index does not exist' do + before do + Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) + db.drop_index :jobs, :user_guid, name: :jobs_user_guid_index + end + + it 'does not fail' do + expect(db[:jobs].columns).to include(:user_guid) + expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index) + expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error + end + + it 'continues to remove the `user_guid` column' do + expect(db[:jobs].columns).to include(:user_guid) + expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index) + expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error + expect(db[:jobs].columns).not_to include(:user_guid) + end + end + + context 'index and column do not exist' do + before do + Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true) + db.drop_index :jobs, :user_guid, name: :jobs_user_guid_index + db.drop_column :jobs, :user_guid + end + + it 'does not fail' do + expect(db[:jobs].columns).not_to include(:user_guid) + expect(db.indexes(:jobs)).not_to include(:jobs_user_guid_index) + expect { Sequel::Migrator.run(db, migration_to_test, allow_missing_migration_files: true, target: 0) }.not_to raise_error + end + end + end + end +end diff --git a/spec/unit/jobs/enqueuer_spec.rb b/spec/unit/jobs/enqueuer_spec.rb index 71d9915ce44..f18063e6142 100644 --- a/spec/unit/jobs/enqueuer_spec.rb +++ b/spec/unit/jobs/enqueuer_spec.rb @@ -1,3 +1,4 @@ +require 'spec_helper' require 'db_spec_helper' require 'jobs/enqueuer' require 'jobs/delete_action_job' @@ -133,6 +134,15 @@ module VCAP::CloudController::Jobs end end + it 'uses the default priority' do + original_enqueue = Delayed::Job.method(:enqueue) + expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, opts| + expect(opts).not_to include(:priority) + original_enqueue.call(enqueued_job, opts) + end + Enqueuer.new(wrapped_job, opts).enqueue_pollable + end + context 'priority from config' do let(:priorities) { { priorities: { wrapped_job.display_name.to_sym => 1899 } } } @@ -144,6 +154,18 @@ module VCAP::CloudController::Jobs end Enqueuer.new(wrapped_job, opts).enqueue_pollable end + + context 'and priority from Enqueuer (e.g. from reoccurring jobs)' do + it 'uses the priority passed into the Enqueuer' do + original_enqueue = Delayed::Job.method(:enqueue) + expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, opts| + expect(opts).to include({ priority: 2000 }) + original_enqueue.call(enqueued_job, opts) + end + opts[:priority] = 2000 + Enqueuer.new(wrapped_job, opts).enqueue_pollable + end + end end end diff --git a/spec/unit/jobs/pollable_job_wrapper_spec.rb b/spec/unit/jobs/pollable_job_wrapper_spec.rb index 2fed90d9bb9..6d4c398b3d1 100644 --- a/spec/unit/jobs/pollable_job_wrapper_spec.rb +++ b/spec/unit/jobs/pollable_job_wrapper_spec.rb @@ -38,6 +38,76 @@ class BigException < StandardError expect(job_record.reload.state).to eq('COMPLETE') end + context 'when dynamic job priorities are enabled' do + before do + TestConfig.config[:jobs][:enable_dynamic_job_priorities] = true + end + + context 'when there are several active jobs for the current user' do + let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) } + let(:pollable_job) { PollableJobWrapper.new(job) } + + before do + stub_const('VCAP::CloudController::SecurityContext', security_context) + VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue + VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue + VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue + end + + it 'adds +1 to base priority for each active job' do + TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 } + + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue + expect(enqueued_job.priority).to eq(23) + end + + context 'when SecurityContext does not have a user guid' do + let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({}) }) } + + before do + stub_const('VCAP::CloudController::SecurityContext', security_context) + end + + it 'does not change its delayed job\'s base priority' do + TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 } + + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue + expect(enqueued_job.priority).to eq(20) + end + + it 'uses nil for the user_guid' do + TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 } + + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue + + job_record = VCAP::CloudController::PollableJobModel.find(delayed_job_guid: enqueued_job.guid) + expect(job_record.user_guid).to be_nil + end + end + end + end + + context 'when dynamic job priorities are disabled' do + context 'when there are several active jobs for the current user' do + let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) } + let(:pollable_job) { PollableJobWrapper.new(job) } + + before do + stub_const('VCAP::CloudController::SecurityContext', security_context) + VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue + VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue + VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue + end + + it 'does not change its delayed job\'s base priority' do + TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 } + + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue + expect(enqueued_job.priority).to eq(20) + end + end + end + context 'reusing a pollable job' do let!(:existing) { VCAP::CloudController::PollableJobModel.make } let(:pollable_job) { PollableJobWrapper.new(job, existing_guid: existing.guid) } @@ -60,6 +130,30 @@ class BigException < StandardError expect(job_record.cf_api_error).to be_nil end + context 'when dynamic job priorities are enabled' do + before do + TestConfig.config[:jobs][:enable_dynamic_job_priorities] = true + end + + context 'when there are several active jobs for the current user' do + let(:security_context) { double({ current_user_email: 'user-email', current_user_name: 'user-name', current_user: double({ guid: 'user-guid' }) }) } + + before do + stub_const('VCAP::CloudController::SecurityContext', security_context) + VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue + VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue + VCAP::CloudController::Jobs::Enqueuer.new(PollableJobWrapper.new(job)).enqueue + end + + it 'does not change its delayed job\'s base priority' do + TestConfig.config[:jobs][:priorities] = { 'droplet.delete': 20 } + + enqueued_job = VCAP::CloudController::Jobs::Enqueuer.new(pollable_job).enqueue + expect(enqueued_job.priority).to eq(20) + end + end + end + context 'when the job defines its state' do before do job.define_singleton_method(:pollable_job_state) do diff --git a/spec/unit/jobs/reoccurring_job_spec.rb b/spec/unit/jobs/reoccurring_job_spec.rb index 84043888655..9668d45f92e 100644 --- a/spec/unit/jobs/reoccurring_job_spec.rb +++ b/spec/unit/jobs/reoccurring_job_spec.rb @@ -70,6 +70,18 @@ def perform expect(PollableJobModel.first.delayed_job_guid).not_to eq(pollable_job.delayed_job_guid) end + it 'keeps the delayed job\'s priority when re-enqueuing' do + TestConfig.config[:jobs][:priorities] = { 'fake-job': 20 } + + pollable_job = Jobs::Enqueuer.new(FakeJob.new, { queue: Jobs::Queues.generic, priority: 22 }).enqueue_pollable + expect(Delayed::Job.where(guid: PollableJobModel.first.delayed_job_guid).first[:priority]).to eq(22) + + execute_all_jobs(expected_successes: 1, expected_failures: 0, jobs_to_execute: 1) + + expect(Delayed::Job.where(guid: PollableJobModel.first.delayed_job_guid).first[:priority]).to eq(22) + expect(PollableJobModel.first.delayed_job_guid).not_to eq(pollable_job.delayed_job_guid) + end + it 'waits for the polling interval' do job = FakeJob.new job.polling_interval_seconds = 95 diff --git a/spec/unit/models/runtime/pollable_job_model_spec.rb b/spec/unit/models/runtime/pollable_job_model_spec.rb index fa03f9f4292..c35058ef99f 100644 --- a/spec/unit/models/runtime/pollable_job_model_spec.rb +++ b/spec/unit/models/runtime/pollable_job_model_spec.rb @@ -24,6 +24,36 @@ module VCAP::CloudController end end + describe('.number_of_active_jobs_by_user') do + let(:delayed_job) { Delayed::Backend::Sequel::Job.create } + + context 'with finished and active jobs' do + before do + PollableJobModel.create(state: 'PROCESSING', delayed_job_guid: delayed_job.guid, user_guid: 'some-user-guid') + PollableJobModel.create(state: 'POLLING', delayed_job_guid: delayed_job.guid, user_guid: 'some-user-guid') + PollableJobModel.create(state: 'FAILED', delayed_job_guid: delayed_job.guid, user_guid: 'some-user-guid') + PollableJobModel.create(state: 'COMPLETE', delayed_job_guid: delayed_job.guid, user_guid: 'some-user-guid') + end + + it 'returns the number of active jobs created by the user' do + result = PollableJobModel.number_of_active_jobs_by_user('some-user-guid') + expect(result).to eq(2) + end + end + + context 'without active jobs' do + before do + PollableJobModel.create(state: 'FAILED', delayed_job_guid: delayed_job.guid, user_guid: 'some-user-guid') + PollableJobModel.create(state: 'COMPLETE', delayed_job_guid: delayed_job.guid, user_guid: 'some-user-guid') + end + + it 'returns 0' do + result = PollableJobModel.number_of_active_jobs_by_user('some-user-guid') + expect(result).to eq(0) + end + end + end + describe '#complete?' do context 'when the state is complete' do let(:job) { PollableJobModel.make(state: 'COMPLETE') }