diff --git a/lib/eventq/eventq_aws/aws_queue_worker.rb b/lib/eventq/eventq_aws/aws_queue_worker.rb index b8382fb..6364997 100644 --- a/lib/eventq/eventq_aws/aws_queue_worker.rb +++ b/lib/eventq/eventq_aws/aws_queue_worker.rb @@ -104,44 +104,56 @@ def process_message(msg, poller, queue, block) def reject_message(queue, poller, msg, retry_attempts, message, args) if !queue.allow_retry || retry_attempts >= queue.max_retry_attempts - EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected removing from queue. Message: #{serialize_message(message)}") + queue_will_not_retry_message(queue, poller, msg, message) + elsif args.kill + queue_will_kill_message(poller, msg, message) + elsif queue.allow_retry + queue_will_retry_message(queue, poller, msg, retry_attempts, message, args) + end + end - # remove the message from the queue so that it does not get retried again - poller.delete_message(msg) + def queue_will_not_retry_message(queue, poller, msg, message) + EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected removing from queue. Message: #{serialize_message(message)}") - if retry_attempts >= queue.max_retry_attempts - EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Retry attempt limit exceeded.") - context.call_on_retry_exceeded_block(message) - end - elsif args.kill - EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected without retry. Message: #{serialize_message(message)}") + # remove the message from the queue so that it does not get retried again + poller.delete_message(msg) - # remove the message from the queue so that it does not get retried again - poller.delete_message(msg) + if retry_attempts >= queue.max_retry_attempts + EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Retry attempt limit exceeded.") + context.call_on_retry_exceeded_block(message) + end + end - EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Message killed.") - context.call_on_killed_block(message) - elsif queue.allow_retry - retry_attempts += 1 + def queue_will_kill_message(poller, msg, message) + EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Rejected without retry. Message: #{serialize_message(message)}") - EventQ.logger.warn("[#{self.class}] - Message Id: #{args.id}. Rejected requesting retry. Attempts: #{retry_attempts}") + # remove the message from the queue so that it does not get retried again + poller.delete_message(msg) - visibility_timeout = @calculate_visibility_timeout.call( - retry_attempts: retry_attempts, - queue_settings: { - allow_retry_back_off: queue.allow_retry_back_off, - max_retry_delay: queue.max_retry_delay, - retry_back_off_grace: queue.retry_back_off_grace, - retry_back_off_weight: queue.retry_back_off_weight, - retry_delay: queue.retry_delay - } - ) + EventQ.logger.error("[#{self.class}] - Message Id: #{args.id}. Message killed.") + context.call_on_killed_block(message) + end - EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{visibility_timeout}" } - poller.change_message_visibility_timeout(msg, visibility_timeout) + def queue_will_retry_message(queue, poller, msg, retry_attempts, message, args) + retry_attempts += 1 - context.call_on_retry_block(message) - end + EventQ.logger.warn("[#{self.class}] - Message Id: #{args.id}. Rejected requesting retry. Attempts: #{retry_attempts}") + + visibility_timeout = @calculate_visibility_timeout.call( + retry_attempts: retry_attempts, + queue_settings: { + allow_retry_back_off: queue.allow_retry_back_off, + max_retry_delay: queue.max_retry_delay, + retry_back_off_grace: queue.retry_back_off_grace, + retry_back_off_weight: queue.retry_back_off_weight, + retry_delay: queue.retry_delay + } + ) + + EventQ.logger.debug { "[#{self.class}] - Sending message for retry. Message TTL: #{visibility_timeout}" } + poller.change_message_visibility_timeout(msg, visibility_timeout) + + context.call_on_retry_block(message) end end end diff --git a/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb b/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb index 7dc1ab8..d979ee8 100644 --- a/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb +++ b/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb @@ -222,7 +222,7 @@ received_count += 1 received_attribute = args.retry_attempts EventQ.logger.debug { "Message Received: #{event}" } - if received_count > 1 + if received_count == 2 args.kill = true end end