Skip to content

Commit

Permalink
Change to accept DLQ name instead of DLQ URL
Browse files Browse the repository at this point in the history
  • Loading branch information
honglu committed Jan 17, 2019
1 parent fd22a89 commit eb7004d
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 19 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This serverless app redrives the messages from an SQS DLQ (Dead Letter Queue) ba
## Installation Instructions

1. [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and login
1. Go to the app's page on the [Serverless Application Repository](TODO) and click "Deploy"
1. Go to the app's page on the [Serverless Application Repository](https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:303769779339:applications~aws-sqs-dlq-redriver) and click "Deploy"
1. Provide the required app parameters (see parameter details below) and click "Deploy"

## App Parameters
Expand All @@ -31,7 +31,7 @@ You can use the app to redrive messages from any DLQ. To redrive the messages, y
The SQS DLQ Redriver Lambda accepts the following input:
```json
{
"DLQUrl": String,
"DLQName": String,
"MaxMessageCount": Integer
}
```
Expand All @@ -41,7 +41,7 @@ The SQS DLQ Redriver Lambda accepts the following input:
Example:
```json
{
"DLQUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/my-dlq",
"DLQName": "my-dlq",
"MaxMessageCount": 100
}
```
Expand Down
Binary file modified images/app-architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 4 additions & 4 deletions src/redriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

LOG = lambdalogging.getLogger(__name__)
RECEIVE_MESSAGE_MAX_NUMBER_OF_MESSAGES_LIMIT = 10
DLQ_URL_PROPERTY = 'DLQUrl'
DLQ_NAME_PROPERTY = 'DLQName'
MAX_MESSAGE_COUNT_PROPERTY = 'MaxMessageCount'


def redrive(event, context):
"""Lambda function handler."""
LOG.info('Received event: %s', event)
_validate(event)
dlq = event[DLQ_URL_PROPERTY]
dlq = sqs_helper.get_queue_url(event[DLQ_NAME_PROPERTY])
max_message_count = event[MAX_MESSAGE_COUNT_PROPERTY]
source_queues = sqs_helper.get_source_queues(dlq)
processed_message_count = 0
Expand All @@ -35,8 +35,8 @@ def redrive(event, context):


def _validate(event):
if DLQ_URL_PROPERTY not in event:
raise ValueError('{} is missing from event'.format(DLQ_URL_PROPERTY))
if DLQ_NAME_PROPERTY not in event:
raise ValueError('{} is missing from event'.format(DLQ_NAME_PROPERTY))
if MAX_MESSAGE_COUNT_PROPERTY not in event:
raise ValueError('{} is missing from event'.format(MAX_MESSAGE_COUNT_PROPERTY))
try:
Expand Down
21 changes: 18 additions & 3 deletions src/sqs_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ def receive_messages(queue_url, max_message):
"""
LOG.info('Receiving messages from queue %s', queue_url)
receive_message_count = min(max_message, RECEIVE_MESSAGE_MAX_NUMBER_OF_MESSAGES_LIMIT)
return SQS.receive_message(QueueUrl=queue_url,
MaxNumberOfMessages=receive_message_count,
MessageAttributeNames=['All'])['Messages']
response = SQS.receive_message(QueueUrl=queue_url,
MaxNumberOfMessages=receive_message_count,
MessageAttributeNames=['All'])
if 'Messages' in response:
return response['Messages']
else:
return []


def get_source_queues(queue_url):
Expand Down Expand Up @@ -61,3 +65,14 @@ def delete_messages(queue_url, messages):
raise Exception(
'Failed to delete messages from queue {}. Failed messages: {}'
.format(queue_url, delete_message_response['Failed']))


def get_queue_url(queue_name):
"""
Get the SQS Queue URL.
:param queue_name: the queue name of the SQS queue
:return: queue URL
"""
LOG.info('Getting queue url from queue %s', queue_name)
return SQS.get_queue_url(QueueName=queue_name)['QueueUrl']
5 changes: 3 additions & 2 deletions template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Metadata:
ReadmeUrl: ../../README.md
Labels: [serverless, SQS]
HomePageUrl: https://github.com/honglu/aws-sqs-dlq-redriver
SemanticVersion: 1.0.1
SourceCodeUrl: https://github.com/honglu/aws-sqs-dlq-redriver/tree/1.0.1
SemanticVersion: 1.1.0
SourceCodeUrl: https://github.com/honglu/aws-sqs-dlq-redriver/tree/1.1.0

Parameters:
LogLevel:
Expand Down Expand Up @@ -43,6 +43,7 @@ Resources:
- sqs:SendMessage
- sqs:DeleteMessage
- sqs:ListDeadLetterSourceQueues
- sqs:GetQueueUrl
Resource: !Sub 'arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:*'

Outputs:
Expand Down
23 changes: 16 additions & 7 deletions test/unit/test_redriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,50 @@ def test_missing_dlq_url():

def test_missing_max_message_count():
with pytest.raises(ValueError):
redriver.redrive({'DLQUrl': 'mydlq'}, None)
redriver.redrive({'DLQName': 'mydlq'}, None)


def test_max_message_count_not_integer():
with pytest.raises(ValueError):
redriver.redrive({'DLQUrl': 'mydlq', 'MaxMessageCount': 'string'}, None)
redriver.redrive({'DLQName': 'mydlq', 'MaxMessageCount': 'string'}, None)


def test_max_message_count_negative_integer():
with pytest.raises(ValueError):
redriver.redrive({'DLQUrl': 'mydlq', 'MaxMessageCount': -1}, None)
redriver.redrive({'DLQName': 'mydlq', 'MaxMessageCount': -1}, None)


def test_redrive_no_message(mock_sqs_helper):
mock_sqs_helper.receive_messages.return_value = []
dlq_name = 'dlqName'
dlq_url = 'mydlq'
max_message_count = 10
redriver.redrive({'DLQUrl': dlq_url, 'MaxMessageCount': max_message_count}, None)
mock_sqs_helper.get_queue_url.return_value = dlq_url
mock_sqs_helper.receive_messages.return_value = []
redriver.redrive({'DLQName': dlq_name, 'MaxMessageCount': max_message_count}, None)

mock_sqs_helper.get_queue_url.assert_called_once_with(dlq_name)
mock_sqs_helper.get_source_queues.assert_called_once_with(dlq_url)
mock_sqs_helper.receive_messages.assert_called_once_with(dlq_url, max_message_count)
mock_sqs_helper.send_messages.assert_not_called()
mock_sqs_helper.delete_messages.assert_not_called()


def test_redrive(mock_sqs_helper):
dlq_name = 'dlqName'
dlq_url = 'mydlq'
max_message_count = 10
source_queue = 'mySource'
mock_sqs_helper.get_queue_url.return_value = dlq_url
mock_sqs_helper.get_source_queues.return_value = [source_queue]
mock_sqs_helper.receive_messages.side_effect = [[{
'MessageId': 'myId',
'Body': 'This is my message',
'MessageAttributes': [],
'ReceiptHandle': 'myHandle'
}], []]
redriver.redrive({'DLQUrl': dlq_url, 'MaxMessageCount': max_message_count}, None)
redriver.redrive({'DLQName': dlq_name, 'MaxMessageCount': max_message_count}, None)

mock_sqs_helper.get_queue_url.assert_called_once_with(dlq_name)
mock_sqs_helper.get_source_queues.assert_called_once_with(dlq_url)
mock_sqs_helper.receive_messages.assert_has_calls([call(dlq_url, max_message_count), call(dlq_url, max_message_count - 1)])
mock_sqs_helper.send_messages.assert_called_once_with(source_queue, [{
Expand All @@ -67,17 +73,20 @@ def test_redrive(mock_sqs_helper):


def test_redrive_no_message_attributes(mock_sqs_helper):
dlq_name = 'dlqName'
dlq_url = 'mydlq'
max_message_count = 10
source_queue = 'mySource'
mock_sqs_helper.get_queue_url.return_value = dlq_url
mock_sqs_helper.get_source_queues.return_value = [source_queue]
mock_sqs_helper.receive_messages.side_effect = [[{
'MessageId': 'myId',
'Body': 'This is my message',
'ReceiptHandle': 'myHandle'
}], []]
redriver.redrive({'DLQUrl': dlq_url, 'MaxMessageCount': max_message_count}, None)
redriver.redrive({'DLQName': dlq_name, 'MaxMessageCount': max_message_count}, None)

mock_sqs_helper.get_queue_url.assert_called_once_with(dlq_name)
mock_sqs_helper.get_source_queues.assert_called_once_with(dlq_url)
mock_sqs_helper.receive_messages.assert_has_calls([call(dlq_url, max_message_count), call(dlq_url, max_message_count - 1)])
mock_sqs_helper.send_messages.assert_called_once_with(source_queue, [{
Expand Down
19 changes: 19 additions & 0 deletions test/unit/test_sqs_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ def test_receive_messages(mock_sqs):
MessageAttributeNames=['All'])


def test_receive_messages_no_messages(mock_sqs):
dlq_url = 'mydlq'
max_message_count = 9
mock_sqs.receive_message.return_value = {}

assert sqs_helper.receive_messages(dlq_url, max_message_count) == []
mock_sqs.receive_message.assert_called_once_with(QueueUrl=dlq_url, MaxNumberOfMessages=max_message_count,
MessageAttributeNames=['All'])


def test_receive_messages_more_than_default_max_message(mock_sqs):
dlq_url = 'mydlq'
max_message_count = 11
Expand Down Expand Up @@ -75,3 +85,12 @@ def test_delete_messages_failed(mock_sqs):
with pytest.raises(Exception):
sqs_helper.delete_messages(queue_url, messages)
mock_sqs.delete_message_batch.assert_called_once_with(QueueUrl=queue_url, Entries=messages)


def test_get_queue_url(mock_sqs):
queue_name = 'myQueue'
queue_url = 'myUrl'
mock_sqs.get_queue_url.return_value = {'QueueUrl': queue_url}

assert sqs_helper.get_queue_url(queue_name) == queue_url
mock_sqs.get_queue_url.assert_called_once_with(QueueName=queue_name)

0 comments on commit eb7004d

Please sign in to comment.