Skip to content

Commit

Permalink
Merge pull request #23112 from agrare/miq_worker_worker_management_ku…
Browse files Browse the repository at this point in the history
…bernetes_non_rails_system_uid

Link MiqWorker record to a running pod when not created using run_single_worker.rb
  • Loading branch information
Fryguy authored Aug 22, 2024
2 parents a447b9f + d7a2b57 commit a04c02d
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 5 deletions.
28 changes: 28 additions & 0 deletions app/models/miq_server/worker_management/kubernetes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,34 @@ def sync_from_system
def sync_starting_workers
starting = MiqWorker.find_all_starting

# Get a list of pods that aren't currently assigned to MiqWorker records
pods_without_workers = current_pods.keys - MiqWorker.server_scope.pluck(:system_uid).compact

# Non-rails workers cannot set their own miq_worker record to started once they
# have finished initializing. Check for any starting non-rails workers whose
# pod is running and mark the miq_worker as started.
starting.reject(&:rails_worker?).each do |worker|
# If the current worker doesn't have a system_uid assigned then find the first
# pod available for our worker type and link them up.
if worker.system_uid.nil?
system_uid = pods_without_workers.detect { |pod_name| pod_name.start_with?(worker.worker_deployment_name) }
if system_uid
# We have found a pod for the current worker record so remove the pod from
# the list of pods without workers and set the pod name as the system_uid
# for the current worker record.
pods_without_workers.delete(system_uid)
worker.update!(:system_uid => system_uid)
else
# If we haven't found a pod for this worker record then we need to check
# whether it has been starting for too long and should be marked as
# not responding.
stop_worker(worker, MiqServer::WorkerManagement::NOT_RESPONDING) if exceeded_heartbeat_threshold?(worker)
# Without a valid system_uid we cannot run any further logic in this
# loop.
next
end
end

worker_pod = current_pods[worker.system_uid]
next if worker_pod.nil?

Expand Down Expand Up @@ -54,7 +78,11 @@ def enough_resource_to_start_worker?(_worker_class)

def cleanup_orphaned_worker_rows
unless current_pods.empty?
# Any worker rows which have a system_uid that is not in the list of
# current pod names, and is not starting (aka hasn't had a system_uid set
# yet) should be deleted.
orphaned_rows = miq_workers.where.not(:system_uid => current_pods.keys)
.where.not(:status => MiqWorker::STATUSES_STARTING)
unless orphaned_rows.empty?
_log.warn("Removing orphaned worker rows without corresponding pods: #{orphaned_rows.collect(&:system_uid).inspect}")
orphaned_rows.destroy_all
Expand Down
59 changes: 54 additions & 5 deletions spec/models/miq_server/worker_management/kubernetes_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,16 @@
before { MiqWorkerType.seed }

context "podified" do
let(:rails_worker) { true }
let(:pod_name) { "1-generic-abcd" }
let(:pod_running) { true }
let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => pod_name) }
let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} }
let(:rails_worker) { true }
let(:pod_name) { "#{server.compressed_id}-generic-abcd" }
let(:pod_running) { true }
let(:last_heartbeat) { Time.now.utc }
let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => system_uid, :last_heartbeat => last_heartbeat) }
let(:system_uid) { pod_name }
let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} }

before do
allow(worker.class).to receive(:containerized_worker?).and_return(true)
allow(worker.class).to receive(:rails_worker?).and_return(rails_worker)
server.worker_manager.current_pods = current_pods
end
Expand Down Expand Up @@ -270,6 +273,52 @@
expect(server.worker_manager.sync_starting_workers).to include(worker)
end
end

context "with a worker that doesn't have a system_uid yet" do
let(:system_uid) { nil }

before { server.worker_manager.sync_config }

context "without a pod" do
let(:current_pods) { {} }

it "returns the worker as still starting" do
expect(server.worker_manager.sync_starting_workers).to include(worker)
end

context "with a worker that has been starting longer than the starting_timeout" do
let(:last_heartbeat) { 20.minutes.ago.utc }

it "marks the worker as not responding" do
# Make sure that #find_worker returns our instance of worker that
# that stubs the #stop_container method.
expect(server.worker_manager).to receive(:find_worker).with(worker).and_return(worker)
expect(worker).to receive(:stop_container)

server.worker_manager.sync_starting_workers
expect(worker.reload.status).to eq("stopping")
end
end
end

context "with a pod that is running" do
let(:pod_running) { true }

it "sets the worker's system_uid and marks the worker started" do
expect(server.worker_manager.sync_starting_workers).to be_empty
expect(worker.reload.system_uid).to eq(pod_name)
end
end

context "with a pod that isn't running" do
let(:pod_running) { false }

it "sets the worker's system_uid and but doesn't mark the worker started" do
expect(server.worker_manager.sync_starting_workers).to include(worker)
expect(worker.reload.system_uid).to eq(pod_name)
end
end
end
end
end
end
Expand Down

0 comments on commit a04c02d

Please sign in to comment.