Skip to content

Commit

Permalink
Merge pull request #58 from t-richard/partial
Browse files Browse the repository at this point in the history
Support partial batch failure
  • Loading branch information
t-richard authored Aug 30, 2022
2 parents 3f354cc + 4704a83 commit ae29f32
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 8 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ services:
arguments:
# Pass the transport name used in config/packages/messenger.yaml
$transportName: 'async'
# true enables partial SQS batch failure
# Enabling this without proper SQS config will consider all your messages successful
# See https://bref.sh/docs/function/handlers.html#partial-batch-response for more details.
$partialBatchFailure: false
```

Now, anytime a message is dispatched to SQS, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed.
Expand Down
5 changes: 3 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
"symfony/dependency-injection": "^4.3 || ^5.0 || ^6.0",
"symfony/http-kernel": "^4.3 || ^5.0 || ^6.0",
"symfony/yaml": "^4.3 || ^5.0 || ^6.0",
"bref/bref": "^0.5.18 || ^1.0",
"bref/bref": "^1.5",
"async-aws/sns": "^1.0",
"async-aws/sqs": "^1.2",
"async-aws/event-bridge": "^1.0",
"symfony/amazon-sqs-messenger": "^5.2 || ^6.0"
"symfony/amazon-sqs-messenger": "^5.2 || ^6.0",
"symfony/polyfill-php80": "^1.26"
},
"require-dev": {
"phpunit/phpunit": "^9.4",
Expand Down
49 changes: 43 additions & 6 deletions src/Service/Sqs/SqsConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use Bref\Event\Sqs\SqsEvent;
use Bref\Event\Sqs\SqsHandler;
use Bref\Symfony\Messenger\Service\BusDriver;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsXrayTraceHeaderStamp;
use Symfony\Component\Messenger\MessageBusInterface;
Expand All @@ -23,44 +25,79 @@ final class SqsConsumer extends SqsHandler
private $transportName;
/** @var BusDriver */
private $busDriver;
/** @var bool */
private $partialBatchFailure;
/** @var LoggerInterface|null */
private $logger;

public function __construct(
BusDriver $busDriver,
MessageBusInterface $bus,
SerializerInterface $serializer,
string $transportName
string $transportName,
LoggerInterface $logger = null,
bool $partialBatchFailure = false
) {
$this->busDriver = $busDriver;
$this->bus = $bus;
$this->serializer = $serializer;
$this->transportName = $transportName;
$this->logger = $logger ?? new NullLogger();
$this->partialBatchFailure = $partialBatchFailure;
}

public function handleSqs(SqsEvent $event, Context $context): void
{
$isFifoQueue = null;
$hasPreviousMessageFailed = false;

foreach ($event->getRecords() as $record) {
if ($isFifoQueue === null) {
$isFifoQueue = \str_ends_with($record->toArray()['eventSourceARN'], '.fifo');
}

/*
* When using FIFO queues, preserving order is important.
* If a previous message has failed in the batch, we need to skip the next ones and requeue them.
*/
if ($isFifoQueue && $hasPreviousMessageFailed) {
$this->markAsFailed($record);
}

$headers = [];
$attributes = $record->getMessageAttributes();

if (isset($attributes[self::MESSAGE_ATTRIBUTE_NAME]) && $attributes[self::MESSAGE_ATTRIBUTE_NAME]['dataType'] === 'String') {
$headers = json_decode($attributes[self::MESSAGE_ATTRIBUTE_NAME]['stringValue'], true);
unset($attributes[self::MESSAGE_ATTRIBUTE_NAME]);
}

foreach ($attributes as $name => $attribute) {
if ($attribute['dataType'] !== 'String') {
continue;
}
$headers[$name] = $attribute['stringValue'];
}

$envelope = $this->serializer->decode(['body' => $record->getBody(), 'headers' => $headers]);
try {
$envelope = $this->serializer->decode(['body' => $record->getBody(), 'headers' => $headers]);

$stamps = [new AmazonSqsReceivedStamp($record->getMessageId())];

$stamps = [new AmazonSqsReceivedStamp($record->getMessageId())];
if ('' !== $context->getTraceId()) {
$stamps[] = new AmazonSqsXrayTraceHeaderStamp($context->getTraceId());
}
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->transportName);
} catch (\Throwable $exception) {
if ($this->partialBatchFailure === false) {
throw $exception;
}

if ('' !== $context->getTraceId()) {
$stamps[] = new AmazonSqsXrayTraceHeaderStamp($context->getTraceId());
$this->logger->error(sprintf('SQS record with id "%s" failed to be processed.', $record->getMessageId()));
$this->logger->error($exception->getMessage());
$this->markAsFailed($record);
$hasPreviousMessageFailed = true;
}
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->transportName);
}
}
}
2 changes: 2 additions & 0 deletions tests/Unit/Service/Sqs/SqsConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public function testSerializer()
],
'eventSource'=>'aws:sqs',
'messageId' => $messageId,
'eventSourceARN' => 'arn:aws:sqs:us-east-1:123456789012:queue1'
],
],
]);
Expand Down Expand Up @@ -120,6 +121,7 @@ public function testSerializerWithXRayHeader()
],
'eventSource'=>'aws:sqs',
'messageId' => $messageId,
'eventSourceARN' => 'arn:aws:sqs:us-east-1:123456789012:queue1'
],
],
]);
Expand Down

0 comments on commit ae29f32

Please sign in to comment.