From 1fd1f89874344f5296999e74c3c4e7a8e4265830 Mon Sep 17 00:00:00 2001 From: Robin Lehrmann Date: Fri, 9 Feb 2024 18:58:00 +0100 Subject: [PATCH 1/3] Added TransportNameResolver for sns and sqs for automatic transport resolving --- .../BrefMessengerExtension.php | 14 +- src/Resources/config/services.yaml | 12 ++ .../MessengerTransportConfiguration.php | 37 ++++++ src/Service/Sns/SnsConsumer.php | 12 +- src/Service/Sns/SnsTransportNameResolver.php | 30 +++++ src/Service/Sqs/SqsConsumer.php | 10 +- src/Service/Sqs/SqsTransportNameResolver.php | 29 +++++ .../MessengerTransportConfigurationTest.php | 48 +++++++ tests/Unit/Service/Sns/SnsConsumerTest.php | 104 +++++++++++++-- .../Sns/SnsTransportNameResolverTest.php | 46 +++++++ tests/Unit/Service/Sqs/SqsConsumerTest.php | 122 ++++++++++++++++-- 11 files changed, 436 insertions(+), 28 deletions(-) create mode 100644 src/Service/MessengerTransportConfiguration.php create mode 100644 src/Service/Sns/SnsTransportNameResolver.php create mode 100644 src/Service/Sqs/SqsTransportNameResolver.php create mode 100644 tests/Unit/Service/MessengerTransportConfigurationTest.php create mode 100644 tests/Unit/Service/Sns/SnsTransportNameResolverTest.php diff --git a/src/DependencyInjection/BrefMessengerExtension.php b/src/DependencyInjection/BrefMessengerExtension.php index 08027f4..2c75500 100644 --- a/src/DependencyInjection/BrefMessengerExtension.php +++ b/src/DependencyInjection/BrefMessengerExtension.php @@ -4,14 +4,26 @@ use Symfony\Component\Config\FileLocator; use Symfony\Component\DependencyInjection\ContainerBuilder; +use Symfony\Component\DependencyInjection\Extension\PrependExtensionInterface; use Symfony\Component\DependencyInjection\Loader\YamlFileLoader; use Symfony\Component\HttpKernel\DependencyInjection\Extension; -class BrefMessengerExtension extends Extension +class BrefMessengerExtension extends Extension implements PrependExtensionInterface { public function load(array $configs, ContainerBuilder $container): void { $loader = new YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config')); $loader->load('services.yaml'); } + + public function prepend(ContainerBuilder $container): void + { + $configs = $container->getExtensionConfig('framework'); + + foreach (array_reverse($configs) as $config) { + if (array_key_exists('messenger', $config)) { + $container->setParameter('messenger.transports', $config['messenger']['transports']); + } + } + } } diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml index 0bf098a..3a2e0f7 100644 --- a/src/Resources/config/services.yaml +++ b/src/Resources/config/services.yaml @@ -4,7 +4,14 @@ services: arguments: - '@logger' + Bref\Symfony\Messenger\Service\MessengerTransportConfiguration: + arguments: + $messengerTransportsConfiguration: '%messenger.transports%' + # SNS + Bref\Symfony\Messenger\Service\Sns\SnsTransportNameResolver: + arguments: + - '@Bref\Symfony\Messenger\Service\MessengerTransportConfiguration' Bref\Symfony\Messenger\Service\Sns\SnsTransportFactory: tags: ['messenger.transport_factory'] arguments: @@ -12,6 +19,11 @@ services: bref.messenger.sns_client: class: AsyncAws\Sns\SnsClient + # SQS + Bref\Symfony\Messenger\Service\Sqs\SqsTransportNameResolver: + arguments: + - '@Bref\Symfony\Messenger\Service\MessengerTransportConfiguration' + # EventBridge Bref\Symfony\Messenger\Service\EventBridge\EventBridgeTransportFactory: tags: ['messenger.transport_factory'] diff --git a/src/Service/MessengerTransportConfiguration.php b/src/Service/MessengerTransportConfiguration.php new file mode 100644 index 0000000..9d6aa7e --- /dev/null +++ b/src/Service/MessengerTransportConfiguration.php @@ -0,0 +1,37 @@ +messengerTransportsConfiguration as $messengerTransport => $messengerOptions) { + $dsn = $this->extractDsnFromTransport($messengerOptions); + + if ($dsn === $eventSourceWithProtocol) { + return $messengerTransport; + } + } + + throw new InvalidArgumentException(sprintf('No transport found for eventSource "%s".', $eventSourceWithProtocol)); + } + + private function extractDsnFromTransport(string|array $messengerTransport): string + { + if (is_array($messengerTransport) && array_key_exists('dsn', $messengerTransport)) { + return $messengerTransport['dsn']; + } + + return $messengerTransport; + } +} diff --git a/src/Service/Sns/SnsConsumer.php b/src/Service/Sns/SnsConsumer.php index 594c672..b934083 100644 --- a/src/Service/Sns/SnsConsumer.php +++ b/src/Service/Sns/SnsConsumer.php @@ -15,21 +15,25 @@ final class SnsConsumer extends SnsHandler private $bus; /** @var SerializerInterface */ protected $serializer; - /** @var string */ - private $transportName; + /** @var SnsTransportNameResolver */ + private $transportNameResolver; /** @var BusDriver */ private $busDriver; + /** @var string|null */ + private $transportName; public function __construct( BusDriver $busDriver, MessageBusInterface $bus, SerializerInterface $serializer, - string $transportName + SnsTransportNameResolver $transportNameResolver, + string $transportName = null ) { $this->busDriver = $busDriver; $this->bus = $bus; $this->serializer = $serializer; $this->transportName = $transportName; + $this->transportNameResolver = $transportNameResolver; } public function handleSns(SnsEvent $event, Context $context): void @@ -39,7 +43,7 @@ public function handleSns(SnsEvent $event, Context $context): void $headers = isset($attributes['Headers']) ? $attributes['Headers']->getValue() : '[]'; $envelope = $this->serializer->decode(['body' => $record->getMessage(), 'headers' => json_decode($headers, true)]); - $this->busDriver->putEnvelopeOnBus($this->bus, $envelope, $this->transportName); + $this->busDriver->putEnvelopeOnBus($this->bus, $envelope, $this->transportName ?? ($this->transportNameResolver)($record)); } } } diff --git a/src/Service/Sns/SnsTransportNameResolver.php b/src/Service/Sns/SnsTransportNameResolver.php new file mode 100644 index 0000000..fc78707 --- /dev/null +++ b/src/Service/Sns/SnsTransportNameResolver.php @@ -0,0 +1,30 @@ +toArray())) { + throw new InvalidArgumentException('EventSubscriptionArn is missing in sns record.'); + } + + $eventSourceArn = $snsRecord->getEventSubscriptionArn(); + + return $this->configurationProvider->provideTransportFromEventSource(self::TRANSPORT_PROTOCOL . $eventSourceArn); + } +} diff --git a/src/Service/Sqs/SqsConsumer.php b/src/Service/Sqs/SqsConsumer.php index 102fc85..100f3a7 100644 --- a/src/Service/Sqs/SqsConsumer.php +++ b/src/Service/Sqs/SqsConsumer.php @@ -23,7 +23,9 @@ final class SqsConsumer extends SqsHandler private $bus; /** @var SerializerInterface */ protected $serializer; - /** @var string */ + /** @var SqsTransportNameResolver */ + private $transportNameResolver; + /** @var string|null */ private $transportName; /** @var BusDriver */ private $busDriver; @@ -36,13 +38,15 @@ public function __construct( BusDriver $busDriver, MessageBusInterface $bus, SerializerInterface $serializer, - string $transportName, + SqsTransportNameResolver $transportNameResolver, + string $transportName = null, LoggerInterface $logger = null, bool $partialBatchFailure = false ) { $this->busDriver = $busDriver; $this->bus = $bus; $this->serializer = $serializer; + $this->transportNameResolver = $transportNameResolver; $this->transportName = $transportName; $this->logger = $logger ?? new NullLogger(); $this->partialBatchFailure = $partialBatchFailure; @@ -93,7 +97,7 @@ public function handleSqs(SqsEvent $event, Context $context): void if ('' !== $context->getTraceId()) { $stamps[] = new AmazonSqsXrayTraceHeaderStamp($context->getTraceId()); } - $this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->transportName); + $this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->transportName ?? ($this->transportNameResolver)($record)); } catch (UnrecoverableExceptionInterface $exception) { $this->logger->error(sprintf('SQS record with id "%s" failed to be processed. But failure was marked as unrecoverable. Message will be acknowledged.', $record->getMessageId())); $this->logger->error($exception); diff --git a/src/Service/Sqs/SqsTransportNameResolver.php b/src/Service/Sqs/SqsTransportNameResolver.php new file mode 100644 index 0000000..a738db0 --- /dev/null +++ b/src/Service/Sqs/SqsTransportNameResolver.php @@ -0,0 +1,29 @@ +toArray())) { + throw new InvalidArgumentException('EventSourceArn is missing in sqs record.'); + } + + $eventSourceArn = $sqsRecord->toArray()['eventSourceARN']; + + return $this->configurationProvider->provideTransportFromEventSource(self::TRANSPORT_PROTOCOL . $eventSourceArn); + } +} diff --git a/tests/Unit/Service/MessengerTransportConfigurationTest.php b/tests/Unit/Service/MessengerTransportConfigurationTest.php new file mode 100644 index 0000000..1e39548 --- /dev/null +++ b/tests/Unit/Service/MessengerTransportConfigurationTest.php @@ -0,0 +1,48 @@ + 'sqs://arn:aws:sqs:us-east-1:123456789012:example_one', + 'async_example_two' => [ + 'dsn' => 'sqs://arn:aws:sqs:us-east-1:123456789012:example_two', + ], + ]); + + self::assertSame( + 'async_example_one', + $messengerTransportConfiguration->provideTransportFromEventSource( + 'sqs://arn:aws:sqs:us-east-1:123456789012:example_one' + ) + ); + + self::assertSame( + 'async_example_two', + $messengerTransportConfiguration->provideTransportFromEventSource( + 'sqs://arn:aws:sqs:us-east-1:123456789012:example_two' + ) + ); + } + + public function test_non_existing_transport_for_event_source_arn_will_be_not_found(): void + { + $messengerTransportConfiguration = new MessengerTransportConfiguration([]); + + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage( + 'No transport found for eventSource "sqs://arn:aws:sqs:us-east-1:123456789012:missing".' + ); + + $messengerTransportConfiguration->provideTransportFromEventSource( + 'sqs://arn:aws:sqs:us-east-1:123456789012:missing' + ); + } +} \ No newline at end of file diff --git a/tests/Unit/Service/Sns/SnsConsumerTest.php b/tests/Unit/Service/Sns/SnsConsumerTest.php index 286f55e..2fecea0 100644 --- a/tests/Unit/Service/Sns/SnsConsumerTest.php +++ b/tests/Unit/Service/Sns/SnsConsumerTest.php @@ -6,6 +6,7 @@ use Bref\Event\Sns\SnsEvent; use Bref\Symfony\Messenger\Service\BusDriver; use Bref\Symfony\Messenger\Service\Sns\SnsConsumer; +use Bref\Symfony\Messenger\Service\Sns\SnsTransportNameResolver; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBus; @@ -13,28 +14,115 @@ class SnsConsumerTest extends TestCase { - public function testSerializer() + private $busDriver; + + private $serializer; + + private MessageBus $bus; + + private $snsTransportNameResolver; + + /** @before */ + public function before(): void { - $busDriver = $this->getMockBuilder(BusDriver::class) + $this->busDriver = $this->getMockBuilder(BusDriver::class) ->disableOriginalConstructor() ->onlyMethods(['putEnvelopeOnBus']) ->getMock(); - $bus = new MessageBus; - $serializer = $this->getMockBuilder(SerializerInterface::class) + + $this->bus = new MessageBus; + + $this->serializer = $this->getMockBuilder(SerializerInterface::class) ->disableOriginalConstructor() ->onlyMethods(['encode', 'decode']) ->getMock(); + $this->snsTransportNameResolver = $this->getMockBuilder(SnsTransportNameResolver::class) + ->disableOriginalConstructor() + ->getMock(); + } + + public function test_serializer() + { + $headers = ['Content-Type' => 'application/json']; + $body = 'Test message.'; + + $this->serializer->expects($this->once()) + ->method('decode') + ->with(['body' => $body, 'headers' => $headers]) + ->willReturn(new Envelope(new \stdClass)); + + $this->snsTransportNameResolver->expects($this->once()) + ->method('__invoke') + ->willReturn('async'); + + $consumer = new SnsConsumer( + $this->busDriver, + $this->bus, + $this->serializer, + $this->snsTransportNameResolver + ); + + $event = $this->snsEvent($body, $headers); + + $consumer->handleSns($event, new Context('', 0, '', '')); + } + + public function test_event_with_transport_detection(): void + { + $headers = ['Content-Type' => 'application/json']; + $body = 'Test message.'; + + $this->serializer->expects($this->once()) + ->method('decode') + ->with(['body' => $body, 'headers' => $headers]) + ->willReturn(new Envelope(new \stdClass)); + + $this->snsTransportNameResolver->expects($this->once()) + ->method('__invoke') + ->willReturn('async'); + + $consumer = new SnsConsumer( + $this->busDriver, + $this->bus, + $this->serializer, + $this->snsTransportNameResolver + ); + + $event = $this->snsEvent($body, $headers); + + $consumer->handleSns($event, new Context('', 0, '', '')); + } + + public function test_event_with_manually_set_transport(): void + { $headers = ['Content-Type' => 'application/json']; $body = 'Test message.'; - $serializer->expects($this->once()) + $this->serializer->expects($this->once()) ->method('decode') ->with(['body' => $body, 'headers' => $headers]) ->willReturn(new Envelope(new \stdClass)); - $consumer = new SnsConsumer($busDriver, $bus, $serializer, 'async'); - $event = new SnsEvent([ + $this->snsTransportNameResolver->expects($this->never()) + ->method('__invoke'); + + $consumer = new SnsConsumer( + $this->busDriver, + $this->bus, + $this->serializer, + $this->snsTransportNameResolver, + 'async' + ); + + $event = $this->snsEvent($body, $headers); + + $consumer->handleSns($event, new Context('', 0, '', '')); + } + + private function snsEvent(string $body, array $headers): SnsEvent + { + return new SnsEvent([ 'Records' => [ [ @@ -51,7 +139,5 @@ public function testSerializer() ], ], ]); - - $consumer->handleSns($event, new Context('', 0, '', '')); } } diff --git a/tests/Unit/Service/Sns/SnsTransportNameResolverTest.php b/tests/Unit/Service/Sns/SnsTransportNameResolverTest.php new file mode 100644 index 0000000..e9698f4 --- /dev/null +++ b/tests/Unit/Service/Sns/SnsTransportNameResolverTest.php @@ -0,0 +1,46 @@ +prophesize(MessengerTransportConfiguration::class); + $messengerTransportConfiguration + ->provideTransportFromEventSource(Argument::cetera()) + ->willReturn('async'); + + $transportNameResolver = new SnsTransportNameResolver($messengerTransportConfiguration->reveal()); + + $event = new SnsEvent([ + 'Records' => [ + [ + + 'EventSource'=>'aws:sns', + 'EventSubscriptionArn' => 'arn:aws:sns:us-east-1:1234567890:async', + 'Sns' => [ + 'Message' => 'Test message.', + 'MessageAttributes' => [ + 'Headers' => [ + 'Type'=> 'String', + 'Value'=> ['Content-Type' => 'application/json'], + ], + ], + ], + ], + ], + ]); + + self::assertSame('async', ($transportNameResolver)($event->getRecords()[0])); + } +} \ No newline at end of file diff --git a/tests/Unit/Service/Sqs/SqsConsumerTest.php b/tests/Unit/Service/Sqs/SqsConsumerTest.php index 2043997..00bc566 100644 --- a/tests/Unit/Service/Sqs/SqsConsumerTest.php +++ b/tests/Unit/Service/Sqs/SqsConsumerTest.php @@ -3,11 +3,10 @@ namespace Bref\Symfony\Messenger\Test\Unit\Service\Sqs; use Bref\Context\Context; -use Bref\Event\Sqs\SqsEvent; use Bref\Symfony\Messenger\Service\BusDriver; use Bref\Symfony\Messenger\Service\Sqs\SqsConsumer; +use Bref\Symfony\Messenger\Service\Sqs\SqsTransportNameResolver; use Bref\Symfony\Messenger\Test\Resources\TestMessage\TestMessage; -use LogicException; use PHPUnit\Framework\TestCase; use Prophecy\Argument; use Prophecy\PhpUnit\ProphecyTrait; @@ -16,7 +15,6 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; use Symfony\Component\Messenger\MessageBus; -use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Throwable; @@ -30,11 +28,14 @@ class SqsConsumerTest extends TestCase private MessageBus $bus; + private $sqsTransportNameResolver; + /** @before */ public function prepare() { $this->busDriver = $this->prophesize(BusDriver::class); $this->serializer = $this->prophesize(SerializerInterface::class); + $this->sqsTransportNameResolver = $this->prophesize(SqsTransportNameResolver::class); $this->bus = new MessageBus; } @@ -62,11 +63,58 @@ public function test_batch_events() ), ]; - $consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $transport); + $consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $this->sqsTransportNameResolver->reveal()); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); $this->assertEmpty($failures); } + public function test_event_with_transport_detection(): void + { + $transport = 'async'; + $sqsRecords = [ + $this->sqsRecordWillSuccessfullyBeHandled( + new TestMessage('test'), + 'e00c848c-2579-4f6a-a006-ccdc2808ed64', + 'Test message 1', + $transport, + ) + ]; + + $consumer = new SqsConsumer( + $this->busDriver->reveal(), + $this->bus, + $this->serializer->reveal(), + $this->sqsTransportNameResolver->reveal() + ); + + $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); + } + + public function test_event_with_manually_set_transport(): void + { + $transport = 'async_test'; + $sqsRecords = [ + $this->sqsRecordWillSuccessfullyBeHandled( + new TestMessage('test'), + 'e00c848c-2579-4f6a-a006-ccdc2808ed64', + 'Test message 1', + $transport, + ) + ]; + + $this->sqsTransportNameResolver->__invoke(Argument::cetera())->shouldNotBeCalled(); + + $consumer = new SqsConsumer( + $this->busDriver->reveal(), + $this->bus, + $this->serializer->reveal(), + $this->sqsTransportNameResolver->reveal(), + 'async_test' + ); + + $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); + } + public function test_batch_events_with_failure() { $transport = 'async'; @@ -91,7 +139,12 @@ public function test_batch_events_with_failure() ), ]; - $consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $transport); + $consumer = new SqsConsumer( + $this->busDriver->reveal(), + $this->bus, + $this->serializer->reveal(), + $this->sqsTransportNameResolver->reveal() + ); $this->expectExceptionMessage('boom'); $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); @@ -121,7 +174,15 @@ public function test_batch_events_failure_with_partial_batch_failure_enabled() ), ]; - $consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $transport, null, true); + $consumer = new SqsConsumer( + $this->busDriver->reveal(), + $this->bus, + $this->serializer->reveal(), + $this->sqsTransportNameResolver->reveal(), + null, + null, + true + ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); @@ -156,7 +217,15 @@ public function test_batch_events_failure_on_fifo_queue_with_partial_batch_failu ), ]; - $consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $transport, null, true); + $consumer = new SqsConsumer( + $this->busDriver->reveal(), + $this->bus, + $this->serializer->reveal(), + $this->sqsTransportNameResolver->reveal(), + null, + null, + true + ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); @@ -193,7 +262,12 @@ public function test_x_ray_header_is_dispatched_on_bus() ), ]; - $consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $transport); + $consumer = new SqsConsumer( + $this->busDriver->reveal(), + $this->bus, + $this->serializer->reveal(), + $this->sqsTransportNameResolver->reveal() + ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', $xrayTraceId)); $this->assertEmpty($failures); } @@ -227,7 +301,15 @@ public function test_unrecoverable_exception_during_batch() ), ]; - $consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $transport, null, true); + $consumer = new SqsConsumer( + $this->busDriver->reveal(), + $this->bus, + $this->serializer->reveal(), + $this->sqsTransportNameResolver->reveal(), + null, + null, + true + ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); $this->assertEmpty($failures); @@ -270,7 +352,15 @@ public function test_message_group_id_during_batch_of_fifo_queue() ), ]; - $consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $transport, null, true); + $consumer = new SqsConsumer( + $this->busDriver->reveal(), + $this->bus, + $this->serializer->reveal(), + $this->sqsTransportNameResolver->reveal(), + null, + null, + true + ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); $this->assertNotContains(['itemIdentifier' => 'e00c848c-2579-4f6a-a006-ccdc2808ed64'], $failures['batchItemFailures']); @@ -316,7 +406,15 @@ public function test_different_message_group_id_failed_during_batch_of_fifo_queu ), ]; - $consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $transport, null, true); + $consumer = new SqsConsumer( + $this->busDriver->reveal(), + $this->bus, + $this->serializer->reveal(), + $this->sqsTransportNameResolver->reveal(), + null, + null, + true + ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); $this->assertNotContains(['itemIdentifier' => 'e00c848c-2579-4f6a-a006-ccdc2808ed64'], $failures['batchItemFailures']); @@ -371,6 +469,8 @@ private function sqsRecordWillSuccessfullyBeHandledWithStamps(object $message, s ->shouldBeCalled() ; + $this->sqsTransportNameResolver->__invoke(Argument::cetera())->willReturn($transport); + return $this->aSqsRecord($message, $messageId, $body, $specialHeaders, $fifo, $messageGroupId); } From 74111e7b4c3709de2640cdadb0814bb18d822dd1 Mon Sep 17 00:00:00 2001 From: Robin Lehrmann Date: Mon, 12 Feb 2024 20:54:44 +0100 Subject: [PATCH 2/3] Added documentation for automatic transport recognition --- README.md | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index eda26ad..d398af5 100644 --- a/README.md +++ b/README.md @@ -175,8 +175,6 @@ services: public: true autowire: true arguments: - # Pass the transport name used in config/packages/messenger.yaml - $transportName: 'async' # true enables partial SQS batch failure # Enabling this without proper SQS config will consider all your messages successful # See https://bref.sh/docs/function/handlers.html#partial-batch-response for more details. @@ -296,9 +294,6 @@ services: Bref\Symfony\Messenger\Service\Sns\SnsConsumer: public: true autowire: true - arguments: - # Pass the transport name used in config/packages/messenger.yaml - $transportName: 'async' ``` Now, anytime a message is dispatched to SNS, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed. @@ -390,7 +385,6 @@ services: public: true autowire: true arguments: - # Pass the transport name used in config/packages/messenger.yaml $transportName: 'async' # Optionnally, if you have different buses in config/packages/messenger.yaml, set $bus like below: # $bus: '@event.bus' @@ -456,6 +450,34 @@ services: region: us-east-1 ``` +### Automatic transport recognition + +Automatic transport recognition is primarily handled by default through TransportNameResolvers for SNS and SQS, +ensuring that the transport name is automatically passed to your message handlers. +However, in scenarios where you need to manually specify the transport name or adjust the default behavior, +you can do so by setting the `$transportName` parameter in your service definitions within the config/services.yaml file. +This parameter should match the transport name defined in your config/packages/messenger.yaml. +For instance, for a SNSConsumer, you would configure it as follows: + +```yaml +# config/packages/messenger.yaml +framework: + messenger: + transports: + async: '%env(MESSENGER_TRANSPORT_DSN)%' +``` + +```yaml +# config/services.yaml +services: + Bref\Symfony\Messenger\Service\Sns\SnsConsumer: + public: true + autowire: true + arguments: + # Pass the transport name used in config/packages/messenger.yaml + $transportName: 'async' +``` + ### Disabling transports By default, this package registers Symfony Messenger transports for SQS, SNS and EventBridge. @@ -481,6 +503,5 @@ services: public: true autowire: true arguments: - $transportName: 'async' $serializer: '@Happyr\MessageSerializer\Serializer' ``` From 7598b6ac9b6147ec95027e7015d2440af69ce916 Mon Sep 17 00:00:00 2001 From: Robin Lehrmann Date: Tue, 13 Feb 2024 15:55:14 +0100 Subject: [PATCH 3/3] Removed BC break and moved $transportName to correct place again --- src/Service/Sns/SnsConsumer.php | 25 ++++++++--- src/Service/Sqs/SqsConsumer.php | 18 ++++++-- tests/Unit/Service/Sns/SnsConsumerTest.php | 7 ++- tests/Unit/Service/Sqs/SqsConsumerTest.php | 51 +++++++++++++++------- 4 files changed, 72 insertions(+), 29 deletions(-) diff --git a/src/Service/Sns/SnsConsumer.php b/src/Service/Sns/SnsConsumer.php index b934083..92486ec 100644 --- a/src/Service/Sns/SnsConsumer.php +++ b/src/Service/Sns/SnsConsumer.php @@ -5,7 +5,9 @@ use Bref\Context\Context; use Bref\Event\Sns\SnsEvent; use Bref\Event\Sns\SnsHandler; +use Bref\Event\Sns\SnsRecord; use Bref\Symfony\Messenger\Service\BusDriver; +use LogicException; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -15,19 +17,19 @@ final class SnsConsumer extends SnsHandler private $bus; /** @var SerializerInterface */ protected $serializer; - /** @var SnsTransportNameResolver */ - private $transportNameResolver; - /** @var BusDriver */ - private $busDriver; /** @var string|null */ private $transportName; + /** @var BusDriver */ + private $busDriver; + /** @var SnsTransportNameResolver|null */ + private $transportNameResolver; public function __construct( BusDriver $busDriver, MessageBusInterface $bus, SerializerInterface $serializer, - SnsTransportNameResolver $transportNameResolver, - string $transportName = null + string $transportName = null, + SnsTransportNameResolver $transportNameResolver = null, ) { $this->busDriver = $busDriver; $this->bus = $bus; @@ -43,7 +45,16 @@ public function handleSns(SnsEvent $event, Context $context): void $headers = isset($attributes['Headers']) ? $attributes['Headers']->getValue() : '[]'; $envelope = $this->serializer->decode(['body' => $record->getMessage(), 'headers' => json_decode($headers, true)]); - $this->busDriver->putEnvelopeOnBus($this->bus, $envelope, $this->transportName ?? ($this->transportNameResolver)($record)); + $this->busDriver->putEnvelopeOnBus($this->bus, $envelope, $this->resolveTransportName($record)); } } + + private function resolveTransportName(SnsRecord $record): string + { + if (null === $this->transportName && null === $this->transportNameResolver) { + throw new LogicException('You need to set $transportNameResolver or $transportName.'); + } + + return $this->transportName ?? ($this->transportNameResolver)($record); + } } diff --git a/src/Service/Sqs/SqsConsumer.php b/src/Service/Sqs/SqsConsumer.php index 100f3a7..0cc7596 100644 --- a/src/Service/Sqs/SqsConsumer.php +++ b/src/Service/Sqs/SqsConsumer.php @@ -7,6 +7,7 @@ use Bref\Event\Sqs\SqsHandler; use Bref\Event\Sqs\SqsRecord; use Bref\Symfony\Messenger\Service\BusDriver; +use LogicException; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp; @@ -38,18 +39,18 @@ public function __construct( BusDriver $busDriver, MessageBusInterface $bus, SerializerInterface $serializer, - SqsTransportNameResolver $transportNameResolver, string $transportName = null, LoggerInterface $logger = null, - bool $partialBatchFailure = false + bool $partialBatchFailure = false, + SqsTransportNameResolver $transportNameResolver = null ) { $this->busDriver = $busDriver; $this->bus = $bus; $this->serializer = $serializer; - $this->transportNameResolver = $transportNameResolver; $this->transportName = $transportName; $this->logger = $logger ?? new NullLogger(); $this->partialBatchFailure = $partialBatchFailure; + $this->transportNameResolver = $transportNameResolver; } public function handleSqs(SqsEvent $event, Context $context): void @@ -97,7 +98,7 @@ public function handleSqs(SqsEvent $event, Context $context): void if ('' !== $context->getTraceId()) { $stamps[] = new AmazonSqsXrayTraceHeaderStamp($context->getTraceId()); } - $this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->transportName ?? ($this->transportNameResolver)($record)); + $this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->resolveTransportName($record)); } catch (UnrecoverableExceptionInterface $exception) { $this->logger->error(sprintf('SQS record with id "%s" failed to be processed. But failure was marked as unrecoverable. Message will be acknowledged.', $record->getMessageId())); $this->logger->error($exception); @@ -120,4 +121,13 @@ private function readMessageGroupIdOfRecord(SqsRecord $record): ?string $recordAsArray = $record->toArray(); return $recordAsArray['attributes']['MessageGroupId'] ?? null; } + + private function resolveTransportName(SqsRecord $record): string + { + if (null === $this->transportName && null === $this->transportNameResolver) { + throw new LogicException('You need to set $transportNameResolver or $transportName.'); + } + + return $this->transportName ?? ($this->transportNameResolver)($record); + } } diff --git a/tests/Unit/Service/Sns/SnsConsumerTest.php b/tests/Unit/Service/Sns/SnsConsumerTest.php index 2fecea0..d7c6c21 100644 --- a/tests/Unit/Service/Sns/SnsConsumerTest.php +++ b/tests/Unit/Service/Sns/SnsConsumerTest.php @@ -7,6 +7,7 @@ use Bref\Symfony\Messenger\Service\BusDriver; use Bref\Symfony\Messenger\Service\Sns\SnsConsumer; use Bref\Symfony\Messenger\Service\Sns\SnsTransportNameResolver; +use LogicException; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBus; @@ -60,6 +61,7 @@ public function test_serializer() $this->busDriver, $this->bus, $this->serializer, + null, $this->snsTransportNameResolver ); @@ -86,6 +88,7 @@ public function test_event_with_transport_detection(): void $this->busDriver, $this->bus, $this->serializer, + null, $this->snsTransportNameResolver ); @@ -111,8 +114,8 @@ public function test_event_with_manually_set_transport(): void $this->busDriver, $this->bus, $this->serializer, - $this->snsTransportNameResolver, - 'async' + 'async', + $this->snsTransportNameResolver ); $event = $this->snsEvent($body, $headers); diff --git a/tests/Unit/Service/Sqs/SqsConsumerTest.php b/tests/Unit/Service/Sqs/SqsConsumerTest.php index 00bc566..dd43971 100644 --- a/tests/Unit/Service/Sqs/SqsConsumerTest.php +++ b/tests/Unit/Service/Sqs/SqsConsumerTest.php @@ -63,7 +63,15 @@ public function test_batch_events() ), ]; - $consumer = new SqsConsumer($this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), $this->sqsTransportNameResolver->reveal()); + $consumer = new SqsConsumer( + $this->busDriver->reveal(), + $this->bus, + $this->serializer->reveal(), + null, + null, + false, + $this->sqsTransportNameResolver->reveal() + ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); $this->assertEmpty($failures); } @@ -84,6 +92,9 @@ public function test_event_with_transport_detection(): void $this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), + null, + null, + false, $this->sqsTransportNameResolver->reveal() ); @@ -108,8 +119,10 @@ public function test_event_with_manually_set_transport(): void $this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), + 'async_test', + null, + false, $this->sqsTransportNameResolver->reveal(), - 'async_test' ); $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); @@ -143,6 +156,9 @@ public function test_batch_events_with_failure() $this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), + $transport, + null, + false, $this->sqsTransportNameResolver->reveal() ); @@ -178,10 +194,10 @@ public function test_batch_events_failure_with_partial_batch_failure_enabled() $this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), - $this->sqsTransportNameResolver->reveal(), null, null, - true + true, + $this->sqsTransportNameResolver->reveal() ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); @@ -221,10 +237,10 @@ public function test_batch_events_failure_on_fifo_queue_with_partial_batch_failu $this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), - $this->sqsTransportNameResolver->reveal(), - null, + $transport, null, - true + true, + $this->sqsTransportNameResolver->reveal() ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); @@ -266,6 +282,9 @@ public function test_x_ray_header_is_dispatched_on_bus() $this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), + $transport, + null, + false, $this->sqsTransportNameResolver->reveal() ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', $xrayTraceId)); @@ -305,10 +324,10 @@ public function test_unrecoverable_exception_during_batch() $this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), - $this->sqsTransportNameResolver->reveal(), - null, + $transport, null, - true + true, + $this->sqsTransportNameResolver->reveal(), ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); @@ -356,10 +375,10 @@ public function test_message_group_id_during_batch_of_fifo_queue() $this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), - $this->sqsTransportNameResolver->reveal(), - null, + $transport, null, - true + true, + $this->sqsTransportNameResolver->reveal(), ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', '')); @@ -410,10 +429,10 @@ public function test_different_message_group_id_failed_during_batch_of_fifo_queu $this->busDriver->reveal(), $this->bus, $this->serializer->reveal(), - $this->sqsTransportNameResolver->reveal(), - null, + $transport, null, - true + true, + $this->sqsTransportNameResolver->reveal(), ); $failures = $consumer->handle(['Records' => $sqsRecords], new Context('', 0, '', ''));