From a2e2c89844a50dcef33d1949e2ad6b32a4962c2f Mon Sep 17 00:00:00 2001 From: Stefan Hoffmann Date: Thu, 26 Oct 2023 20:44:49 +0200 Subject: [PATCH] Stop the dispatching of new messages when a SIGTERM signal has been received (#750) * Stop the dispatching of new messages when a SIGTERM signal has been received * Use ActiveSupport::Testing::TimeHelpers to fix a flaky test that checks the message deduplication id * Removed ActiveSupport::Testing::TimeHelpers as it is not available in rails 4 * Exclude `enqueued_at` in the body for the deduplication id --------- Co-authored-by: Stefan Hoffmann --- .../extensions/active_job_adapter.rb | 7 +++-- lib/shoryuken/launcher.rb | 3 +++ spec/shared_examples_for_active_job.rb | 6 +++-- spec/shoryuken/launcher_spec.rb | 26 ++++++++++++++----- 4 files changed, 32 insertions(+), 10 deletions(-) diff --git a/lib/shoryuken/extensions/active_job_adapter.rb b/lib/shoryuken/extensions/active_job_adapter.rb index 21c81286..3a7482bc 100644 --- a/lib/shoryuken/extensions/active_job_adapter.rb +++ b/lib/shoryuken/extensions/active_job_adapter.rb @@ -66,8 +66,11 @@ def message(queue, job) } if queue.fifo? - # See https://github.com/phstc/shoryuken/issues/457 - msg[:message_deduplication_id] = Digest::SHA256.hexdigest(JSON.dump(body.except('job_id'))) + # See https://github.com/ruby-shoryuken/shoryuken/issues/457 and + # https://github.com/ruby-shoryuken/shoryuken/pull/750#issuecomment-1781317929 + msg[:message_deduplication_id] = Digest::SHA256.hexdigest( + JSON.dump(body.except('job_id', 'enqueued_at')) + ) end msg.merge(job_params.except(:message_attributes)) diff --git a/lib/shoryuken/launcher.rb b/lib/shoryuken/launcher.rb index 5a9d94a8..30269fe5 100644 --- a/lib/shoryuken/launcher.rb +++ b/lib/shoryuken/launcher.rb @@ -16,6 +16,9 @@ def start def stop! initiate_stop + # Don't await here so the timeout below is not delayed + stop_new_dispatching + executor.shutdown executor.kill unless executor.wait_for_termination(Shoryuken.options[:timeout]) diff --git a/spec/shared_examples_for_active_job.rb b/spec/shared_examples_for_active_job.rb index 49c13741..c1f2be06 100644 --- a/spec/shared_examples_for_active_job.rb +++ b/spec/shared_examples_for_active_job.rb @@ -42,9 +42,11 @@ class TestJob < ActiveJob::Base; end context 'when fifo' do let(:fifo) { true } - it 'does not include job_id in the deduplication_id' do + it 'does not include job_id and enqueued_at in the deduplication_id' do expect(queue).to receive(:send_message) do |hash| - message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id'))) + message_deduplication_id = Digest::SHA256.hexdigest( + JSON.dump(job.serialize.except('job_id', 'enqueued_at')) + ) expect(hash[:message_deduplication_id]).to eq(message_deduplication_id) end diff --git a/spec/shoryuken/launcher_spec.rb b/spec/shoryuken/launcher_spec.rb index 0c9a5318..bc65b88c 100644 --- a/spec/shoryuken/launcher_spec.rb +++ b/spec/shoryuken/launcher_spec.rb @@ -67,10 +67,17 @@ end it 'fires quiet, shutdown and stopped event' do - expect(subject).to receive(:fire_event).with(:quiet, true) - expect(subject).to receive(:fire_event).with(:shutdown, true) - expect(subject).to receive(:fire_event).with(:stopped) + allow(subject).to receive(:fire_event) subject.stop + expect(subject).to have_received(:fire_event).with(:quiet, true) + expect(subject).to have_received(:fire_event).with(:shutdown, true) + expect(subject).to have_received(:fire_event).with(:stopped) + end + + it 'stops the managers' do + subject.stop + expect(first_group_manager).to have_received(:stop_new_dispatching) + expect(second_group_manager).to have_received(:stop_new_dispatching) end end @@ -83,9 +90,16 @@ end it 'fires shutdown and stopped event' do - expect(subject).to receive(:fire_event).with(:shutdown, true) - expect(subject).to receive(:fire_event).with(:stopped) + allow(subject).to receive(:fire_event) + subject.stop! + expect(subject).to have_received(:fire_event).with(:shutdown, true) + expect(subject).to have_received(:fire_event).with(:stopped) + end + + it 'stops the managers' do subject.stop! + expect(first_group_manager).to have_received(:stop_new_dispatching) + expect(second_group_manager).to have_received(:stop_new_dispatching) end end -end \ No newline at end of file +end