Skip to content

Commit

Permalink
Implement the SQS transport and consumer for Messenger
Browse files Browse the repository at this point in the history
  • Loading branch information
mnapoli committed Dec 4, 2019
1 parent 03cd6cf commit 1626934
Show file tree
Hide file tree
Showing 14 changed files with 440 additions and 43 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/vendor/
/composer.phar
/composer.lock
/var/
.phpunit.result.cache
134 changes: 98 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,57 +1,119 @@
# My Awesome Project
Bridge to use Symfony Messenger with SQS on AWS Lambda with [Bref](https://bref.sh).

This is the catchphrase: what does this project do and how is it unique?
## Introduction

[![Build Status](https://img.shields.io/travis/com/PHP-DI/PHP-DI/master.svg?style=flat-square)](https://travis-ci.com/PHP-DI/PHP-DI)
[![Latest Version](https://img.shields.io/github/release/PHP-DI/PHP-DI.svg?style=flat-square)](https://packagist.org/packages/PHP-DI/php-di)
[![Total Downloads](https://img.shields.io/packagist/dt/PHP-DI/PHP-DI.svg?style=flat-square)](https://packagist.org/packages/PHP-DI/php-di)
TODO

Here is an additional quick introduction, if necessary.
## Installation

## Why?
This guide assumes that:

Why does this project exist? Come on, don't delete this part. Fill it.
Yes it's hard, but it's perhaps the most important part of the README.
- Symfony is installed
- Symfony Messenger is installed
- Bref is installed and [configured to deploy Symfony](https://bref.sh/docs/frameworks/symfony.html)

As to why *this* project exist, it's to serve as a template for future open
source PHP projects. Of course, feel free to fork it and make your own recipe.
First, install this package:

## Installation
```
composer require bref/symfony-messenger-sqs
```

Next, register the bundle in `config/bundles.php`:

```php
return [
...
Bref\Messenger\BrefMessengerBundle::class => ['all' => true],
];
```

Describe how to install the project/library/framework/…
Next, configure Symfony Messenger to dispatch a message via SQS:

Make sure your installation instructions work by testing them!
```yaml
# config/packages/messenger.yaml

## Usage
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
'App\Message\MyMessage': async
```
Describe how to use the project. A gif or a short code example is the best
way to show how it works. Also keep paragraphs short and sentences simple: not
everybody speaks english well.
Here, the `MyMessage` class will be dispatch to the `async` transport. We can now configure the `async` transport to use our SQS queue.

For the sake of the example here is how you can use this project template
as a basis for your own repository:
To do that, let's configure the `MESSENGER_TRANSPORT_DSN` environment variable to contain the URL of the queue:

```bash
git clone https://github.com/mnapoli/project-template.git my-project
cd my-project
# Remove the git repository metadata
rm -rf .git/
# Start a brand new repository
git init
git add .
```dotenv
MESSENGER_TRANSPORT_DSN=sqs://sqs.us-east-1.amazonaws.com/123456789101/my-queue
```

Easy peasy! Now you just have to code.
**Watch out:** the SQS URL _must start_ with `sqs://` instead of `https://`. This prefix is the way Symfony Messenger works. That means that you must replace the beginning of the SQS URL that AWS will give you.

### Sending messages

Make sure your examples work by testing them! I didn't test mine and I should feel ashamed.
Now that Messenger is configured with SQS, we can send messages using the `MessageBusInterface`. For example, in a controller:

## Contributing
```php
class DefaultController extends AbstractController
{
public function index()
{
$this->dispatchMessage(new App\Message\MyMessage());
}
}
```

Read [the Symfony documentation to learn more](https://symfony.com/doc/current/messenger.html#dispatching-the-message).

See the [CONTRIBUTING](./.github/CONTRIBUTING.md) file.
### Processing message

## License
Messages are sent to SQS, we now need to process those messages asynchronously.

Come on, [choose a license](http://choosealicense.com/) already! If you don't know or don't
care, the MIT license is the most widely used license.
We can create a Lambda to do that in `serverless.yml`:

For *this* project, I choose […drumroll…] the [Do What the Fuck You Want to Public License](http://www.wtfpl.net/).
```yaml
functions:
worker:
handler: consumer.php
timeout: 120 # in seconds
reservedConcurrency: 5 # max. 5 messages processed in parallel
layers:
- ${bref:layer.php-73}
events:
- sqs:
arn: arn:aws:sqs:us-east-1:123456789101:my-queue
# Only 1 item at a time to simplify error handling
batchSize: 1
```

The Lambda handler will be `consumer.php`, a file we must create:

```php
<?php declare(strict_types=1);
use Bref\Messenger\Sqs\SqsConsumer;
require __DIR__ . '/config/bootstrap.php';
lambda(function ($event) {
$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();
$sqsConsumer = $kernel->getContainer()->get(SqsConsumer::class);
$sqsConsumer->consumeLambdaEvent($event);
});
```

Finally, we must configure the `SqsConsumer` service in `config/services.yaml` (this configuration relies on autowiring being enabled by default):

```yaml
services:
...
Bref\Messenger\Sqs\SqsConsumer:
arguments:
# Inject the transport name used in config/packages/messenger.yaml
$transportName: 'async'
public: true
```
22 changes: 15 additions & 7 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
{
"name": "mnapoli/myproject",
"description": "Give it a nice description!",
"keywords": [],
"name": "bref/symfony-messenger-sqs",
"description": "Symfony Messenger bridge to run with SQS on AWS Lambda with Bref",
"keywords": ["bref", "symfony", "messenger", "sqs", "aws", "aws-lambda"],
"license": "MIT",
"type": "library",
"autoload": {
"psr-4": {
"MyProject\\": "src/"
"Bref\\Messenger\\": "src/"
}
},
"autoload-dev": {
"psr-4": {
"MyProject\\Test\\": "tests/"
"Bref\\Messenger\\Test\\": "tests/"
}
},
"require": {
"php": "^7.3"
"php": "^7.3",
"ext-json": "*",
"aws/aws-sdk-php": "^3.127",
"symfony/messenger": "^5.0",
"symfony/config": "^5.0",
"symfony/dependency-injection": "^5.0",
"symfony/http-kernel": "^5.0",
"symfony/yaml": "^5.0"
},
"require-dev": {
"phpunit/phpunit": "^8.0",
"mnapoli/hard-mode": "^0.2.0",
"phpstan/phpstan": "^0.12.0"
"phpstan/phpstan": "^0.12.0",
"symfony/framework-bundle": "^5.0"
}
}
Empty file removed src/.gitkeep
Empty file.
9 changes: 9 additions & 0 deletions src/BrefMessengerBundle.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php declare(strict_types=1);

namespace Bref\Messenger;

use Symfony\Component\HttpKernel\Bundle\Bundle;

class BrefMessengerBundle extends Bundle
{
}
17 changes: 17 additions & 0 deletions src/DependencyInjection/BrefMessengerExtension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php declare(strict_types=1);

namespace Bref\Messenger\DependencyInjection;

use Symfony\Component\Config\FileLocator;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
use Symfony\Component\HttpKernel\DependencyInjection\Extension;

class BrefMessengerExtension extends Extension
{
public function load(array $configs, ContainerBuilder $container): void
{
$loader = new YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config'));
$loader->load('services.yaml');
}
}
5 changes: 5 additions & 0 deletions src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
services:
Bref\Messenger\Sqs\SqsTransportFactory:
tags: ['messenger.transport_factory']
arguments:
$sqs: '@Aws\Sqs\SqsClient'
59 changes: 59 additions & 0 deletions src/Sqs/SqsConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php declare(strict_types=1);

namespace Bref\Messenger\Sqs;

use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

/**
* Class that consumes messages when SQS triggers our Lambda with messages.
*
* This class will put those messages back onto the Symfony Messenger message bus
* so that these messages are handled by their handlers.
*/
class SqsConsumer
{
/** @var MessageBusInterface */
private $bus;
/** @var SerializerInterface */
private $serializer;
/** @var LoggerInterface */
private $logger;
/** @var string */
private $transportName;

public function __construct(
MessageBusInterface $bus,
SerializerInterface $serializer,
LoggerInterface $logger,
string $transportName
) {
$this->bus = $bus;
$this->serializer = $serializer;
$this->logger = $logger;
$this->transportName = $transportName;
}

public function consumeLambdaEvent($event): void
{
foreach ($event['Records'] as $record) {
$envelope = $this->serializer->decode(['body' => $record['body']]);

$this->consume($envelope);
}
}

private function consume(Envelope $envelope): void
{
$this->bus->dispatch($envelope->with(new ReceivedStamp($this->transportName)));

$message = $envelope->getMessage();
$this->logger->info('{class} was handled successfully.', [
'class' => get_class($message),
'message' => $message,
]);
}
}
73 changes: 73 additions & 0 deletions src/Sqs/SqsTransport.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php declare(strict_types=1);

namespace Bref\Messenger\Sqs;

use Aws\Sqs\SqsClient;
use Exception;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Throwable;

class SqsTransport implements TransportInterface
{
/** @var SerializerInterface */
private $serializer;
/** @var SqsClient */
private $sqs;
/** @var string */
private $queueUrl;

public function __construct(SqsClient $sqs, ?SerializerInterface $serializer, string $queueUrl)
{
$this->sqs = $sqs;
$this->serializer = $serializer ?? new PhpSerializer();
$this->queueUrl = $queueUrl;
}

public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);

$headers = $encodedMessage['headers'] ?? [];
$arguments = [
'MessageAttributes' => [
'Headers' => [
'DataType' => 'String',
'StringValue' => json_encode($headers, JSON_THROW_ON_ERROR),
],
],
'MessageBody' => $encodedMessage['body'],
'QueueUrl' => $this->queueUrl,
];

try {
$result = $this->sqs->sendMessage($arguments);
} catch (Throwable $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

if ($result->hasKey('MessageId') === false) {
throw new TransportException('Could not add a message to the SQS queue');
}

return $envelope;
}

public function get(): iterable
{
throw new Exception('Not implemented');
}

public function ack(Envelope $envelope): void
{
throw new Exception('Not implemented');
}

public function reject(Envelope $envelope): void
{
throw new Exception('Not implemented');
}
}
Loading

0 comments on commit 1626934

Please sign in to comment.