From 3d09a707b253acbdf54dff659a329f4267c840f3 Mon Sep 17 00:00:00 2001 From: Johannes Haass Date: Fri, 26 Jul 2024 10:09:31 +0200 Subject: [PATCH] Move `DelayedWorker` to separate class (#3902) --- lib/delayed_job/delayed_worker.rb | 58 ++++++++++++++++++ lib/tasks/jobs.rake | 60 +------------------ .../lib/delayed_job/delayed_worker_spec.rb | 49 +++++++++++++++ 3 files changed, 108 insertions(+), 59 deletions(-) create mode 100644 lib/delayed_job/delayed_worker.rb create mode 100644 spec/unit/lib/delayed_job/delayed_worker_spec.rb diff --git a/lib/delayed_job/delayed_worker.rb b/lib/delayed_job/delayed_worker.rb new file mode 100644 index 00000000000..77df1309fe3 --- /dev/null +++ b/lib/delayed_job/delayed_worker.rb @@ -0,0 +1,58 @@ +class CloudController::DelayedWorker + def initialize(options) + @queue_options = { + min_priority: ENV.fetch('MIN_PRIORITY', nil), + max_priority: ENV.fetch('MAX_PRIORITY', nil), + queues: options.fetch(:queues), + worker_name: options[:name], + quiet: true + } + end + + def start_working + config = RakeConfig.config + BackgroundJobEnvironment.new(config).setup_environment(readiness_port) + + logger = Steno.logger('cc-worker') + logger.info("Starting job with options #{@queue_options}") + + setup_app_log_emitter(config, logger) + Delayed::Worker.destroy_failed_jobs = false + Delayed::Worker.max_attempts = 3 + Delayed::Worker.max_run_time = config.get(:jobs, :global, :timeout_in_seconds) + 1 + Delayed::Worker.logger = logger + worker = Delayed::Worker.new(@queue_options) + worker.name = @queue_options[:worker_name] + worker.start + end + + private + + def setup_app_log_emitter(config, logger) + VCAP::AppLogEmitter.fluent_emitter = fluent_emitter(config) if config.get(:fluent) + if config.get(:loggregator) && config.get( + :loggregator, :router + ) + VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Emitter.new(config.get(:loggregator, :router), 'cloud_controller', 'API', + config.get(:index)) + end + + VCAP::AppLogEmitter.logger = logger + end + + def fluent_emitter(config) + VCAP::FluentEmitter.new(Fluent::Logger::FluentLogger.new(nil, + host: config.get(:fluent, :host) || 'localhost', + port: config.get(:fluent, :port) || 24_224)) + end + + def readiness_port + return unless is_first_generic_worker_on_machine? + + RakeConfig.config.get(:readiness_port, :cloud_controller_worker) + end + + def is_first_generic_worker_on_machine? + RakeConfig.context != :api && ENV['INDEX']&.to_i == 1 + end +end diff --git a/lib/tasks/jobs.rake b/lib/tasks/jobs.rake index a18924f65df..8733224fd1d 100644 --- a/lib/tasks/jobs.rake +++ b/lib/tasks/jobs.rake @@ -1,4 +1,5 @@ require 'delayed_job/quit_trap' +require 'delayed_job/delayed_worker' namespace :jobs do desc 'Clear the delayed_job queue.' @@ -52,63 +53,4 @@ namespace :jobs do CloudController::DelayedWorker.new(queues: queues, name: args.name).start_working end - - class CloudController::DelayedWorker - def initialize(options) - @queue_options = { - min_priority: ENV.fetch('MIN_PRIORITY', nil), - max_priority: ENV.fetch('MAX_PRIORITY', nil), - queues: options.fetch(:queues), - worker_name: options[:name], - quiet: true - } - end - - def start_working - config = RakeConfig.config - BackgroundJobEnvironment.new(config).setup_environment(readiness_port) - - logger = Steno.logger('cc-worker') - logger.info("Starting job with options #{@queue_options}") - - setup_app_log_emitter(config, logger) - Delayed::Worker.destroy_failed_jobs = false - Delayed::Worker.max_attempts = 3 - Delayed::Worker.max_run_time = config.get(:jobs, :global, :timeout_in_seconds) + 1 - Delayed::Worker.logger = logger - worker = Delayed::Worker.new(@queue_options) - worker.name = @queue_options[:worker_name] - worker.start - end - - private - - def setup_app_log_emitter(config, logger) - VCAP::AppLogEmitter.fluent_emitter = fluent_emitter(config) if config.get(:fluent) - if config.get(:loggregator) && config.get( - :loggregator, :router - ) - VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Emitter.new(config.get(:loggregator, :router), 'cloud_controller', 'API', - config.get(:index)) - end - - VCAP::AppLogEmitter.logger = logger - end - - def fluent_emitter(config) - VCAP::FluentEmitter.new(Fluent::Logger::FluentLogger.new(nil, - host: config.get(:fluent, :host) || 'localhost', - port: config.get(:fluent, :port) || 24_224)) - end - - def readiness_port - return unless is_first_generic_worker_on_machine? - - RakeConfig.config.get(:readiness_port, :cloud_controller_worker) - end - - def is_first_generic_worker_on_machine? - RakeConfig.context != :api && ENV['INDEX']&.to_i == 1 - end - end end diff --git a/spec/unit/lib/delayed_job/delayed_worker_spec.rb b/spec/unit/lib/delayed_job/delayed_worker_spec.rb new file mode 100644 index 00000000000..a9743773ed7 --- /dev/null +++ b/spec/unit/lib/delayed_job/delayed_worker_spec.rb @@ -0,0 +1,49 @@ +require 'spec_helper' +require 'tasks/rake_config' +require 'delayed_job/delayed_worker' + +RSpec.describe CloudController::DelayedWorker do + let(:options) { { queues: 'default', name: 'test_worker' } } + let(:environment) { instance_double(BackgroundJobEnvironment, setup_environment: nil) } + let(:worker) { instance_double(Delayed::Worker, start: nil) } + + before do + allow(RakeConfig).to receive(:config).and_return(TestConfig.config_instance) + allow(BackgroundJobEnvironment).to receive(:new).with(anything).and_return(environment) + allow(Delayed::Worker).to receive(:new).and_return(worker) + allow(worker).to receive(:name=).with(anything) + end + + describe '#initialize' do + it 'sets the correct queue options' do + worker_instance = CloudController::DelayedWorker.new(options) + expect(worker_instance.instance_variable_get(:@queue_options)).to eq({ + min_priority: nil, + max_priority: nil, + queues: 'default', + worker_name: 'test_worker', + quiet: true + }) + end + end + + describe '#start_working' do + let(:worker_instance) { CloudController::DelayedWorker.new(options) } + + it 'sets up the environment and starts the worker' do + expect(environment).to receive(:setup_environment).with(anything) + expect(worker).to receive(:name=).with('test_worker') + expect(worker).to receive(:start) + + worker_instance.start_working + end + + it 'configures Delayed::Worker settings' do + worker_instance.start_working + + expect(Delayed::Worker.destroy_failed_jobs).to be false + expect(Delayed::Worker.max_attempts).to eq(3) + expect(Delayed::Worker.max_run_time).to eq(14_401) + end + end +end