Skip to content

Commit

Permalink
Move DelayedWorker to separate class (#3902)
Browse files Browse the repository at this point in the history
  • Loading branch information
johha authored Jul 26, 2024
1 parent 91f58c7 commit 3d09a70
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 59 deletions.
58 changes: 58 additions & 0 deletions lib/delayed_job/delayed_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
class CloudController::DelayedWorker
def initialize(options)
@queue_options = {
min_priority: ENV.fetch('MIN_PRIORITY', nil),
max_priority: ENV.fetch('MAX_PRIORITY', nil),
queues: options.fetch(:queues),
worker_name: options[:name],
quiet: true
}
end

def start_working
config = RakeConfig.config
BackgroundJobEnvironment.new(config).setup_environment(readiness_port)

logger = Steno.logger('cc-worker')
logger.info("Starting job with options #{@queue_options}")

setup_app_log_emitter(config, logger)
Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = config.get(:jobs, :global, :timeout_in_seconds) + 1
Delayed::Worker.logger = logger
worker = Delayed::Worker.new(@queue_options)
worker.name = @queue_options[:worker_name]
worker.start
end

private

def setup_app_log_emitter(config, logger)
VCAP::AppLogEmitter.fluent_emitter = fluent_emitter(config) if config.get(:fluent)
if config.get(:loggregator) && config.get(
:loggregator, :router
)
VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Emitter.new(config.get(:loggregator, :router), 'cloud_controller', 'API',
config.get(:index))
end

VCAP::AppLogEmitter.logger = logger
end

def fluent_emitter(config)
VCAP::FluentEmitter.new(Fluent::Logger::FluentLogger.new(nil,
host: config.get(:fluent, :host) || 'localhost',
port: config.get(:fluent, :port) || 24_224))
end

def readiness_port
return unless is_first_generic_worker_on_machine?

RakeConfig.config.get(:readiness_port, :cloud_controller_worker)
end

def is_first_generic_worker_on_machine?
RakeConfig.context != :api && ENV['INDEX']&.to_i == 1
end
end
60 changes: 1 addition & 59 deletions lib/tasks/jobs.rake
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'delayed_job/quit_trap'
require 'delayed_job/delayed_worker'

namespace :jobs do
desc 'Clear the delayed_job queue.'
Expand Down Expand Up @@ -52,63 +53,4 @@ namespace :jobs do
CloudController::DelayedWorker.new(queues: queues,
name: args.name).start_working
end

class CloudController::DelayedWorker
def initialize(options)
@queue_options = {
min_priority: ENV.fetch('MIN_PRIORITY', nil),
max_priority: ENV.fetch('MAX_PRIORITY', nil),
queues: options.fetch(:queues),
worker_name: options[:name],
quiet: true
}
end

def start_working
config = RakeConfig.config
BackgroundJobEnvironment.new(config).setup_environment(readiness_port)

logger = Steno.logger('cc-worker')
logger.info("Starting job with options #{@queue_options}")

setup_app_log_emitter(config, logger)
Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = config.get(:jobs, :global, :timeout_in_seconds) + 1
Delayed::Worker.logger = logger
worker = Delayed::Worker.new(@queue_options)
worker.name = @queue_options[:worker_name]
worker.start
end

private

def setup_app_log_emitter(config, logger)
VCAP::AppLogEmitter.fluent_emitter = fluent_emitter(config) if config.get(:fluent)
if config.get(:loggregator) && config.get(
:loggregator, :router
)
VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Emitter.new(config.get(:loggregator, :router), 'cloud_controller', 'API',
config.get(:index))
end

VCAP::AppLogEmitter.logger = logger
end

def fluent_emitter(config)
VCAP::FluentEmitter.new(Fluent::Logger::FluentLogger.new(nil,
host: config.get(:fluent, :host) || 'localhost',
port: config.get(:fluent, :port) || 24_224))
end

def readiness_port
return unless is_first_generic_worker_on_machine?

RakeConfig.config.get(:readiness_port, :cloud_controller_worker)
end

def is_first_generic_worker_on_machine?
RakeConfig.context != :api && ENV['INDEX']&.to_i == 1
end
end
end
49 changes: 49 additions & 0 deletions spec/unit/lib/delayed_job/delayed_worker_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
require 'spec_helper'
require 'tasks/rake_config'
require 'delayed_job/delayed_worker'

RSpec.describe CloudController::DelayedWorker do
let(:options) { { queues: 'default', name: 'test_worker' } }
let(:environment) { instance_double(BackgroundJobEnvironment, setup_environment: nil) }
let(:worker) { instance_double(Delayed::Worker, start: nil) }

before do
allow(RakeConfig).to receive(:config).and_return(TestConfig.config_instance)
allow(BackgroundJobEnvironment).to receive(:new).with(anything).and_return(environment)
allow(Delayed::Worker).to receive(:new).and_return(worker)
allow(worker).to receive(:name=).with(anything)
end

describe '#initialize' do
it 'sets the correct queue options' do
worker_instance = CloudController::DelayedWorker.new(options)
expect(worker_instance.instance_variable_get(:@queue_options)).to eq({
min_priority: nil,
max_priority: nil,
queues: 'default',
worker_name: 'test_worker',
quiet: true
})
end
end

describe '#start_working' do
let(:worker_instance) { CloudController::DelayedWorker.new(options) }

it 'sets up the environment and starts the worker' do
expect(environment).to receive(:setup_environment).with(anything)
expect(worker).to receive(:name=).with('test_worker')
expect(worker).to receive(:start)

worker_instance.start_working
end

it 'configures Delayed::Worker settings' do
worker_instance.start_working

expect(Delayed::Worker.destroy_failed_jobs).to be false
expect(Delayed::Worker.max_attempts).to eq(3)
expect(Delayed::Worker.max_run_time).to eq(14_401)
end
end
end

0 comments on commit 3d09a70

Please sign in to comment.