Skip to content

Commit

Permalink
Stop the dispatching of new messages when a SIGTERM signal has been r…
Browse files Browse the repository at this point in the history
…eceived (#750)

* Stop the dispatching of new messages when a SIGTERM signal has been received

* Use ActiveSupport::Testing::TimeHelpers to fix a flaky test that checks the message deduplication id

* Removed ActiveSupport::Testing::TimeHelpers as it is not available in rails 4

* Exclude `enqueued_at` in the body for the deduplication id

---------

Co-authored-by: Stefan Hoffmann <[email protected]>
  • Loading branch information
hoffi and Stefan Hoffmann authored Oct 26, 2023
1 parent f02d28b commit a2e2c89
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 10 deletions.
7 changes: 5 additions & 2 deletions lib/shoryuken/extensions/active_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ def message(queue, job)
}

if queue.fifo?
# See https://github.com/phstc/shoryuken/issues/457
msg[:message_deduplication_id] = Digest::SHA256.hexdigest(JSON.dump(body.except('job_id')))
# See https://github.com/ruby-shoryuken/shoryuken/issues/457 and
# https://github.com/ruby-shoryuken/shoryuken/pull/750#issuecomment-1781317929
msg[:message_deduplication_id] = Digest::SHA256.hexdigest(
JSON.dump(body.except('job_id', 'enqueued_at'))
)
end

msg.merge(job_params.except(:message_attributes))
Expand Down
3 changes: 3 additions & 0 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ def start
def stop!
initiate_stop

# Don't await here so the timeout below is not delayed
stop_new_dispatching

executor.shutdown
executor.kill unless executor.wait_for_termination(Shoryuken.options[:timeout])

Expand Down
6 changes: 4 additions & 2 deletions spec/shared_examples_for_active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ class TestJob < ActiveJob::Base; end
context 'when fifo' do
let(:fifo) { true }

it 'does not include job_id in the deduplication_id' do
it 'does not include job_id and enqueued_at in the deduplication_id' do
expect(queue).to receive(:send_message) do |hash|
message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id')))
message_deduplication_id = Digest::SHA256.hexdigest(
JSON.dump(job.serialize.except('job_id', 'enqueued_at'))
)

expect(hash[:message_deduplication_id]).to eq(message_deduplication_id)
end
Expand Down
26 changes: 20 additions & 6 deletions spec/shoryuken/launcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,17 @@
end

it 'fires quiet, shutdown and stopped event' do
expect(subject).to receive(:fire_event).with(:quiet, true)
expect(subject).to receive(:fire_event).with(:shutdown, true)
expect(subject).to receive(:fire_event).with(:stopped)
allow(subject).to receive(:fire_event)
subject.stop
expect(subject).to have_received(:fire_event).with(:quiet, true)
expect(subject).to have_received(:fire_event).with(:shutdown, true)
expect(subject).to have_received(:fire_event).with(:stopped)
end

it 'stops the managers' do
subject.stop
expect(first_group_manager).to have_received(:stop_new_dispatching)
expect(second_group_manager).to have_received(:stop_new_dispatching)
end
end

Expand All @@ -83,9 +90,16 @@
end

it 'fires shutdown and stopped event' do
expect(subject).to receive(:fire_event).with(:shutdown, true)
expect(subject).to receive(:fire_event).with(:stopped)
allow(subject).to receive(:fire_event)
subject.stop!
expect(subject).to have_received(:fire_event).with(:shutdown, true)
expect(subject).to have_received(:fire_event).with(:stopped)
end

it 'stops the managers' do
subject.stop!
expect(first_group_manager).to have_received(:stop_new_dispatching)
expect(second_group_manager).to have_received(:stop_new_dispatching)
end
end
end
end

0 comments on commit a2e2c89

Please sign in to comment.