Skip to content

Commit

Permalink
Merge pull request #84 from robinlehrmann/transport-name-resolver
Browse files Browse the repository at this point in the history
Added TransportNameResolver for sns and sqs for automatic transport r…
  • Loading branch information
mnapoli authored Apr 10, 2024
2 parents d9168e4 + 7598b6a commit 023ed34
Show file tree
Hide file tree
Showing 12 changed files with 507 additions and 35 deletions.
35 changes: 28 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ services:
public: true
autowire: true
arguments:
# Pass the transport name used in config/packages/messenger.yaml
$transportName: 'async'
# true enables partial SQS batch failure
# Enabling this without proper SQS config will consider all your messages successful
# See https://bref.sh/docs/function/handlers.html#partial-batch-response for more details.
Expand Down Expand Up @@ -296,9 +294,6 @@ services:
Bref\Symfony\Messenger\Service\Sns\SnsConsumer:
public: true
autowire: true
arguments:
# Pass the transport name used in config/packages/messenger.yaml
$transportName: 'async'
```

Now, anytime a message is dispatched to SNS, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed.
Expand Down Expand Up @@ -390,7 +385,6 @@ services:
public: true
autowire: true
arguments:
# Pass the transport name used in config/packages/messenger.yaml
$transportName: 'async'
# Optionnally, if you have different buses in config/packages/messenger.yaml, set $bus like below:
# $bus: '@event.bus'
Expand Down Expand Up @@ -456,6 +450,34 @@ services:
region: us-east-1
```

### Automatic transport recognition

Automatic transport recognition is primarily handled by default through TransportNameResolvers for SNS and SQS,
ensuring that the transport name is automatically passed to your message handlers.
However, in scenarios where you need to manually specify the transport name or adjust the default behavior,
you can do so by setting the `$transportName` parameter in your service definitions within the config/services.yaml file.
This parameter should match the transport name defined in your config/packages/messenger.yaml.
For instance, for a SNSConsumer, you would configure it as follows:

```yaml
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
```

```yaml
# config/services.yaml
services:
Bref\Symfony\Messenger\Service\Sns\SnsConsumer:
public: true
autowire: true
arguments:
# Pass the transport name used in config/packages/messenger.yaml
$transportName: 'async'
```

### Disabling transports

By default, this package registers Symfony Messenger transports for SQS, SNS and EventBridge.
Expand All @@ -481,6 +503,5 @@ services:
public: true
autowire: true
arguments:
$transportName: 'async'
$serializer: '@Happyr\MessageSerializer\Serializer'
```
14 changes: 13 additions & 1 deletion src/DependencyInjection/BrefMessengerExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,26 @@

use Symfony\Component\Config\FileLocator;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Extension\PrependExtensionInterface;
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
use Symfony\Component\HttpKernel\DependencyInjection\Extension;

class BrefMessengerExtension extends Extension
class BrefMessengerExtension extends Extension implements PrependExtensionInterface
{
public function load(array $configs, ContainerBuilder $container): void
{
$loader = new YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config'));
$loader->load('services.yaml');
}

public function prepend(ContainerBuilder $container): void
{
$configs = $container->getExtensionConfig('framework');

foreach (array_reverse($configs) as $config) {
if (array_key_exists('messenger', $config)) {
$container->setParameter('messenger.transports', $config['messenger']['transports']);
}
}
}
}
12 changes: 12 additions & 0 deletions src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,26 @@ services:
arguments:
- '@logger'

Bref\Symfony\Messenger\Service\MessengerTransportConfiguration:
arguments:
$messengerTransportsConfiguration: '%messenger.transports%'

# SNS
Bref\Symfony\Messenger\Service\Sns\SnsTransportNameResolver:
arguments:
- '@Bref\Symfony\Messenger\Service\MessengerTransportConfiguration'
Bref\Symfony\Messenger\Service\Sns\SnsTransportFactory:
tags: ['messenger.transport_factory']
arguments:
- '@bref.messenger.sns_client'
bref.messenger.sns_client:
class: AsyncAws\Sns\SnsClient

# SQS
Bref\Symfony\Messenger\Service\Sqs\SqsTransportNameResolver:
arguments:
- '@Bref\Symfony\Messenger\Service\MessengerTransportConfiguration'

# EventBridge
Bref\Symfony\Messenger\Service\EventBridge\EventBridgeTransportFactory:
tags: ['messenger.transport_factory']
Expand Down
37 changes: 37 additions & 0 deletions src/Service/MessengerTransportConfiguration.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

namespace Bref\Symfony\Messenger\Service;

use InvalidArgumentException;

/** @final */
class MessengerTransportConfiguration
{
public function __construct(
private array $messengerTransportsConfiguration
) {
}

/** @throws InvalidArgumentException */
public function provideTransportFromEventSource(string $eventSourceWithProtocol): string
{
foreach ($this->messengerTransportsConfiguration as $messengerTransport => $messengerOptions) {
$dsn = $this->extractDsnFromTransport($messengerOptions);

if ($dsn === $eventSourceWithProtocol) {
return $messengerTransport;
}
}

throw new InvalidArgumentException(sprintf('No transport found for eventSource "%s".', $eventSourceWithProtocol));
}

private function extractDsnFromTransport(string|array $messengerTransport): string
{
if (is_array($messengerTransport) && array_key_exists('dsn', $messengerTransport)) {
return $messengerTransport['dsn'];
}

return $messengerTransport;
}
}
21 changes: 18 additions & 3 deletions src/Service/Sns/SnsConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
use Bref\Context\Context;
use Bref\Event\Sns\SnsEvent;
use Bref\Event\Sns\SnsHandler;
use Bref\Event\Sns\SnsRecord;
use Bref\Symfony\Messenger\Service\BusDriver;
use LogicException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

Expand All @@ -15,21 +17,25 @@ final class SnsConsumer extends SnsHandler
private $bus;
/** @var SerializerInterface */
protected $serializer;
/** @var string */
/** @var string|null */
private $transportName;
/** @var BusDriver */
private $busDriver;
/** @var SnsTransportNameResolver|null */
private $transportNameResolver;

public function __construct(
BusDriver $busDriver,
MessageBusInterface $bus,
SerializerInterface $serializer,
string $transportName
string $transportName = null,
SnsTransportNameResolver $transportNameResolver = null,
) {
$this->busDriver = $busDriver;
$this->bus = $bus;
$this->serializer = $serializer;
$this->transportName = $transportName;
$this->transportNameResolver = $transportNameResolver;
}

public function handleSns(SnsEvent $event, Context $context): void
Expand All @@ -39,7 +45,16 @@ public function handleSns(SnsEvent $event, Context $context): void
$headers = isset($attributes['Headers']) ? $attributes['Headers']->getValue() : '[]';
$envelope = $this->serializer->decode(['body' => $record->getMessage(), 'headers' => json_decode($headers, true)]);

$this->busDriver->putEnvelopeOnBus($this->bus, $envelope, $this->transportName);
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope, $this->resolveTransportName($record));
}
}

private function resolveTransportName(SnsRecord $record): string
{
if (null === $this->transportName && null === $this->transportNameResolver) {
throw new LogicException('You need to set $transportNameResolver or $transportName.');
}

return $this->transportName ?? ($this->transportNameResolver)($record);
}
}
30 changes: 30 additions & 0 deletions src/Service/Sns/SnsTransportNameResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace Bref\Symfony\Messenger\Service\Sns;

use Bref\Event\Sns\SnsRecord;
use Bref\Symfony\Messenger\Service\MessengerTransportConfiguration;
use InvalidArgumentException;

/** @final */
class SnsTransportNameResolver
{
private const TRANSPORT_PROTOCOL = 'sns://';

public function __construct(
private MessengerTransportConfiguration $configurationProvider
) {
}

/** @throws InvalidArgumentException */
public function __invoke(SnsRecord $snsRecord): string
{
if (!array_key_exists('EventSubscriptionArn', $snsRecord->toArray())) {
throw new InvalidArgumentException('EventSubscriptionArn is missing in sns record.');
}

$eventSourceArn = $snsRecord->getEventSubscriptionArn();

return $this->configurationProvider->provideTransportFromEventSource(self::TRANSPORT_PROTOCOL . $eventSourceArn);
}
}
22 changes: 18 additions & 4 deletions src/Service/Sqs/SqsConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Bref\Event\Sqs\SqsHandler;
use Bref\Event\Sqs\SqsRecord;
use Bref\Symfony\Messenger\Service\BusDriver;
use LogicException;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
Expand All @@ -23,7 +24,9 @@ final class SqsConsumer extends SqsHandler
private $bus;
/** @var SerializerInterface */
protected $serializer;
/** @var string */
/** @var SqsTransportNameResolver */
private $transportNameResolver;
/** @var string|null */
private $transportName;
/** @var BusDriver */
private $busDriver;
Expand All @@ -36,16 +39,18 @@ public function __construct(
BusDriver $busDriver,
MessageBusInterface $bus,
SerializerInterface $serializer,
string $transportName,
string $transportName = null,
LoggerInterface $logger = null,
bool $partialBatchFailure = false
bool $partialBatchFailure = false,
SqsTransportNameResolver $transportNameResolver = null
) {
$this->busDriver = $busDriver;
$this->bus = $bus;
$this->serializer = $serializer;
$this->transportName = $transportName;
$this->logger = $logger ?? new NullLogger();
$this->partialBatchFailure = $partialBatchFailure;
$this->transportNameResolver = $transportNameResolver;
}

public function handleSqs(SqsEvent $event, Context $context): void
Expand Down Expand Up @@ -93,7 +98,7 @@ public function handleSqs(SqsEvent $event, Context $context): void
if ('' !== $context->getTraceId()) {
$stamps[] = new AmazonSqsXrayTraceHeaderStamp($context->getTraceId());
}
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->transportName);
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->resolveTransportName($record));
} catch (UnrecoverableExceptionInterface $exception) {
$this->logger->error(sprintf('SQS record with id "%s" failed to be processed. But failure was marked as unrecoverable. Message will be acknowledged.', $record->getMessageId()));
$this->logger->error($exception);
Expand All @@ -116,4 +121,13 @@ private function readMessageGroupIdOfRecord(SqsRecord $record): ?string
$recordAsArray = $record->toArray();
return $recordAsArray['attributes']['MessageGroupId'] ?? null;
}

private function resolveTransportName(SqsRecord $record): string
{
if (null === $this->transportName && null === $this->transportNameResolver) {
throw new LogicException('You need to set $transportNameResolver or $transportName.');
}

return $this->transportName ?? ($this->transportNameResolver)($record);
}
}
29 changes: 29 additions & 0 deletions src/Service/Sqs/SqsTransportNameResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace Bref\Symfony\Messenger\Service\Sqs;

use Bref\Event\Sqs\SqsRecord;
use Bref\Symfony\Messenger\Service\MessengerTransportConfiguration;
use InvalidArgumentException;

/** @final */
class SqsTransportNameResolver
{
private const TRANSPORT_PROTOCOL = 'sqs://';

public function __construct(
private MessengerTransportConfiguration $configurationProvider
) {
}

public function __invoke(SqsRecord $sqsRecord): string
{
if (!array_key_exists('eventSourceARN', $sqsRecord->toArray())) {
throw new InvalidArgumentException('EventSourceArn is missing in sqs record.');
}

$eventSourceArn = $sqsRecord->toArray()['eventSourceARN'];

return $this->configurationProvider->provideTransportFromEventSource(self::TRANSPORT_PROTOCOL . $eventSourceArn);
}
}
48 changes: 48 additions & 0 deletions tests/Unit/Service/MessengerTransportConfigurationTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

namespace Bref\Symfony\Messenger\Test\Unit\Service;

use Bref\Symfony\Messenger\Service\MessengerTransportConfiguration;
use InvalidArgumentException;
use PHPUnit\Framework\TestCase;

final class MessengerTransportConfigurationTest extends TestCase
{
public function test_existing_transport_will_be_found_with_existing_event_source_arn(): void
{
$messengerTransportConfiguration = new MessengerTransportConfiguration([
'async_example_one' => 'sqs://arn:aws:sqs:us-east-1:123456789012:example_one',
'async_example_two' => [
'dsn' => 'sqs://arn:aws:sqs:us-east-1:123456789012:example_two',
],
]);

self::assertSame(
'async_example_one',
$messengerTransportConfiguration->provideTransportFromEventSource(
'sqs://arn:aws:sqs:us-east-1:123456789012:example_one'
)
);

self::assertSame(
'async_example_two',
$messengerTransportConfiguration->provideTransportFromEventSource(
'sqs://arn:aws:sqs:us-east-1:123456789012:example_two'
)
);
}

public function test_non_existing_transport_for_event_source_arn_will_be_not_found(): void
{
$messengerTransportConfiguration = new MessengerTransportConfiguration([]);

$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage(
'No transport found for eventSource "sqs://arn:aws:sqs:us-east-1:123456789012:missing".'
);

$messengerTransportConfiguration->provideTransportFromEventSource(
'sqs://arn:aws:sqs:us-east-1:123456789012:missing'
);
}
}
Loading

0 comments on commit 023ed34

Please sign in to comment.