Skip to content

Commit

Permalink
refactor reject method
Browse files Browse the repository at this point in the history
  • Loading branch information
samcartwrigh committed Feb 20, 2020
1 parent de4a662 commit 6b40cfb
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 31 deletions.
72 changes: 42 additions & 30 deletions lib/eventq/eventq_aws/aws_queue_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,44 +104,56 @@ def process_message(msg, poller, queue, block)

def reject_message(queue, poller, msg, retry_attempts, message, args)
if !queue.allow_retry || retry_attempts >= queue.max_retry_attempts
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected removing from queue. Message: #{serialize_message(message)}")
queue_will_not_retry_message(queue, poller, msg, message)
elsif args.kill
queue_will_kill_message(poller, msg, message)
elsif queue.allow_retry
queue_will_retry_message(queue, poller, msg, retry_attempts, message, args)
end
end

# remove the message from the queue so that it does not get retried again
poller.delete_message(msg)
def queue_will_not_retry_message(queue, poller, msg, message)
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected removing from queue. Message: #{serialize_message(message)}")

if retry_attempts >= queue.max_retry_attempts
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Retry attempt limit exceeded.")
context.call_on_retry_exceeded_block(message)
end
elsif args.kill
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected without retry. Message: #{serialize_message(message)}")
# remove the message from the queue so that it does not get retried again
poller.delete_message(msg)

# remove the message from the queue so that it does not get retried again
poller.delete_message(msg)
if retry_attempts >= queue.max_retry_attempts
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Retry attempt limit exceeded.")
context.call_on_retry_exceeded_block(message)
end
end

EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Message killed.")
context.call_on_killed_block(message)
elsif queue.allow_retry
retry_attempts += 1
def queue_will_kill_message(poller, msg, message)
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected without retry. Message: #{serialize_message(message)}")

EventQ.logger.warn("[#{self.class}] - Message Id: #{args.id}. Rejected requesting retry. Attempts: #{retry_attempts}")
# remove the message from the queue so that it does not get retried again
poller.delete_message(msg)

visibility_timeout = @calculate_visibility_timeout.call(
retry_attempts: retry_attempts,
queue_settings: {
allow_retry_back_off: queue.allow_retry_back_off,
max_retry_delay: queue.max_retry_delay,
retry_back_off_grace: queue.retry_back_off_grace,
retry_back_off_weight: queue.retry_back_off_weight,
retry_delay: queue.retry_delay
}
)
EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Message killed.")
context.call_on_killed_block(message)
end

EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{visibility_timeout}" }
poller.change_message_visibility_timeout(msg, visibility_timeout)
def queue_will_retry_message(queue, poller, msg, retry_attempts, message, args)
retry_attempts += 1

context.call_on_retry_block(message)
end
EventQ.logger.warn("[#{self.class}] - Message Id: #{args.id}. Rejected requesting retry. Attempts: #{retry_attempts}")

visibility_timeout = @calculate_visibility_timeout.call(
retry_attempts: retry_attempts,
queue_settings: {
allow_retry_back_off: queue.allow_retry_back_off,
max_retry_delay: queue.max_retry_delay,
retry_back_off_grace: queue.retry_back_off_grace,
retry_back_off_weight: queue.retry_back_off_weight,
retry_delay: queue.retry_delay
}
)

EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{visibility_timeout}" }
poller.change_message_visibility_timeout(msg, visibility_timeout)

context.call_on_retry_block(message)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@
received_count += 1
received_attribute = args.retry_attempts
EventQ.logger.debug { "Message Received: #{event}" }
if received_count > 1
if received_count == 2
args.kill = true
end
end
Expand Down

0 comments on commit 6b40cfb

Please sign in to comment.