From 4f070f7c52e0593cd78f8a0cb9cef0699dea6264 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Thu, 25 Jul 2024 16:45:12 -0400 Subject: [PATCH 1/2] Set MiqWorker system_uid if missing on kubernetes --- .../worker_management/kubernetes.rb | 13 +++++++ .../worker_management/kubernetes_spec.rb | 35 +++++++++++++++++-- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/app/models/miq_server/worker_management/kubernetes.rb b/app/models/miq_server/worker_management/kubernetes.rb index bb9fb528cac..224ebd2501e 100644 --- a/app/models/miq_server/worker_management/kubernetes.rb +++ b/app/models/miq_server/worker_management/kubernetes.rb @@ -23,10 +23,23 @@ def sync_from_system def sync_starting_workers starting = MiqWorker.find_all_starting + # Get a list of pods that aren't currently assigned to MiqWorker records + pods_without_workers = current_pods.keys - MiqWorker.server_scope.pluck(:system_uid).compact + # Non-rails workers cannot set their own miq_worker record to started once they # have finished initializing. Check for any starting non-rails workers whose # pod is running and mark the miq_worker as started. starting.reject(&:rails_worker?).each do |worker| + # If the current worker doesn't have a system_uid assigned then find the first + # pod available for our worker type and link them up. + if worker.system_uid.nil? + system_uid = pods_without_workers.detect { |pod_name| pod_name.start_with?(worker.worker_deployment_name) } + next if system_uid.nil? + + pods_without_workers.delete(system_uid) + worker.update!(:system_uid => system_uid) + end + worker_pod = current_pods[worker.system_uid] next if worker_pod.nil? diff --git a/spec/models/miq_server/worker_management/kubernetes_spec.rb b/spec/models/miq_server/worker_management/kubernetes_spec.rb index 407dde8cf19..47af29a7ee2 100644 --- a/spec/models/miq_server/worker_management/kubernetes_spec.rb +++ b/spec/models/miq_server/worker_management/kubernetes_spec.rb @@ -215,9 +215,10 @@ context "podified" do let(:rails_worker) { true } - let(:pod_name) { "1-generic-abcd" } + let(:pod_name) { "#{server.compressed_id}-generic-abcd" } let(:pod_running) { true } - let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => pod_name) } + let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => system_uid) } + let(:system_uid) { pod_name } let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} } before do @@ -270,6 +271,36 @@ expect(server.worker_manager.sync_starting_workers).to include(worker) end end + + context "with a worker that doesn't have a system_uid yet" do + let(:system_uid) { nil } + + context "without a pod" do + let(:current_pods) { {} } + + it "returns the worker as still starting" do + expect(server.worker_manager.sync_starting_workers).to include(worker) + end + end + + context "with a pod that is running" do + let(:pod_running) { true } + + it "sets the worker's system_uid and marks the worker started" do + expect(server.worker_manager.sync_starting_workers).to be_empty + expect(worker.reload.system_uid).to eq(pod_name) + end + end + + context "with a pod that isn't running" do + let(:pod_running) { false } + + it "sets the worker's system_uid and but doesn't mark the worker started" do + expect(server.worker_manager.sync_starting_workers).to include(worker) + expect(worker.reload.system_uid).to eq(pod_name) + end + end + end end end end From d7a2b575f09837b1b3cd1d171d82e972300250f8 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Thu, 8 Aug 2024 11:47:07 -0400 Subject: [PATCH 2/2] Skip starting worker records and check for starting timeout --- .../worker_management/kubernetes.rb | 23 +++++++++++--- .../worker_management/kubernetes_spec.rb | 30 +++++++++++++++---- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/app/models/miq_server/worker_management/kubernetes.rb b/app/models/miq_server/worker_management/kubernetes.rb index 224ebd2501e..841ad198549 100644 --- a/app/models/miq_server/worker_management/kubernetes.rb +++ b/app/models/miq_server/worker_management/kubernetes.rb @@ -34,10 +34,21 @@ def sync_starting_workers # pod available for our worker type and link them up. if worker.system_uid.nil? system_uid = pods_without_workers.detect { |pod_name| pod_name.start_with?(worker.worker_deployment_name) } - next if system_uid.nil? - - pods_without_workers.delete(system_uid) - worker.update!(:system_uid => system_uid) + if system_uid + # We have found a pod for the current worker record so remove the pod from + # the list of pods without workers and set the pod name as the system_uid + # for the current worker record. + pods_without_workers.delete(system_uid) + worker.update!(:system_uid => system_uid) + else + # If we haven't found a pod for this worker record then we need to check + # whether it has been starting for too long and should be marked as + # not responding. + stop_worker(worker, MiqServer::WorkerManagement::NOT_RESPONDING) if exceeded_heartbeat_threshold?(worker) + # Without a valid system_uid we cannot run any further logic in this + # loop. + next + end end worker_pod = current_pods[worker.system_uid] @@ -67,7 +78,11 @@ def enough_resource_to_start_worker?(_worker_class) def cleanup_orphaned_worker_rows unless current_pods.empty? + # Any worker rows which have a system_uid that is not in the list of + # current pod names, and is not starting (aka hasn't had a system_uid set + # yet) should be deleted. orphaned_rows = miq_workers.where.not(:system_uid => current_pods.keys) + .where.not(:status => MiqWorker::STATUSES_STARTING) unless orphaned_rows.empty? _log.warn("Removing orphaned worker rows without corresponding pods: #{orphaned_rows.collect(&:system_uid).inspect}") orphaned_rows.destroy_all diff --git a/spec/models/miq_server/worker_management/kubernetes_spec.rb b/spec/models/miq_server/worker_management/kubernetes_spec.rb index 47af29a7ee2..359706d7fba 100644 --- a/spec/models/miq_server/worker_management/kubernetes_spec.rb +++ b/spec/models/miq_server/worker_management/kubernetes_spec.rb @@ -214,14 +214,16 @@ before { MiqWorkerType.seed } context "podified" do - let(:rails_worker) { true } - let(:pod_name) { "#{server.compressed_id}-generic-abcd" } - let(:pod_running) { true } - let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => system_uid) } - let(:system_uid) { pod_name } - let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} } + let(:rails_worker) { true } + let(:pod_name) { "#{server.compressed_id}-generic-abcd" } + let(:pod_running) { true } + let(:last_heartbeat) { Time.now.utc } + let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => system_uid, :last_heartbeat => last_heartbeat) } + let(:system_uid) { pod_name } + let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} } before do + allow(worker.class).to receive(:containerized_worker?).and_return(true) allow(worker.class).to receive(:rails_worker?).and_return(rails_worker) server.worker_manager.current_pods = current_pods end @@ -275,12 +277,28 @@ context "with a worker that doesn't have a system_uid yet" do let(:system_uid) { nil } + before { server.worker_manager.sync_config } + context "without a pod" do let(:current_pods) { {} } it "returns the worker as still starting" do expect(server.worker_manager.sync_starting_workers).to include(worker) end + + context "with a worker that has been starting longer than the starting_timeout" do + let(:last_heartbeat) { 20.minutes.ago.utc } + + it "marks the worker as not responding" do + # Make sure that #find_worker returns our instance of worker that + # that stubs the #stop_container method. + expect(server.worker_manager).to receive(:find_worker).with(worker).and_return(worker) + expect(worker).to receive(:stop_container) + + server.worker_manager.sync_starting_workers + expect(worker.reload.status).to eq("stopping") + end + end end context "with a pod that is running" do