Skip to content

Commit

Permalink
Merge pull request #63 from hadeli/feature-add-xray-header
Browse files Browse the repository at this point in the history
  • Loading branch information
mnapoli authored Aug 11, 2022
2 parents 91e2abb + 91fce8e commit 3f354cc
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 9 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ jobs:
timeout-minutes: 15
strategy:
matrix:
php: ['8.0', '7.4', '7.3']
php: ['8.0', '7.4']
dependency-version: ['']
include:
- php: '7.3'
dependency-version: '--prefer-lowest'

steps:
- name: Checkout
uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
}
},
"require": {
"php": ">=7.3",
"php": ">=7.4",
"ext-json": "*",
"symfony/messenger": "^4.3 || ^5.0 || ^6.0",
"symfony/config": "^4.3 || ^5.0 || ^6.0",
Expand Down
9 changes: 7 additions & 2 deletions src/Service/Sqs/SqsConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Bref\Event\Sqs\SqsHandler;
use Bref\Symfony\Messenger\Service\BusDriver;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsXrayTraceHeaderStamp;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

Expand Down Expand Up @@ -45,7 +46,6 @@ public function handleSqs(SqsEvent $event, Context $context): void
$headers = json_decode($attributes[self::MESSAGE_ATTRIBUTE_NAME]['stringValue'], true);
unset($attributes[self::MESSAGE_ATTRIBUTE_NAME]);
}

foreach ($attributes as $name => $attribute) {
if ($attribute['dataType'] !== 'String') {
continue;
Expand All @@ -55,7 +55,12 @@ public function handleSqs(SqsEvent $event, Context $context): void

$envelope = $this->serializer->decode(['body' => $record->getBody(), 'headers' => $headers]);

$this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(new AmazonSqsReceivedStamp($record->getMessageId())), $this->transportName);
$stamps = [new AmazonSqsReceivedStamp($record->getMessageId())];

if ('' !== $context->getTraceId()) {
$stamps[] = new AmazonSqsXrayTraceHeaderStamp($context->getTraceId());
}
$this->busDriver->putEnvelopeOnBus($this->bus, $envelope->with(...$stamps), $this->transportName);
}
}
}
70 changes: 68 additions & 2 deletions tests/Unit/Service/Sqs/SqsConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Bref\Symfony\Messenger\Service\BusDriver;
use Bref\Symfony\Messenger\Service\Sqs\SqsConsumer;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsXrayTraceHeaderStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -31,12 +33,19 @@ public function testSerializer()
'Content-Type' => 'application/json',
]);
$body = 'Test message.';
$messageId = 'e00c848c-2579-4f6a-a006-ccdc2808ed64';

$serializer->expects($this->once())
->method('decode')
->with(['body' => $body, 'headers' => $headers])
->willReturn(new Envelope(new \stdClass));

$busDriver->expects($this->once())
->method('putEnvelopeOnBus')
->with($bus, new Envelope(
new \stdClass(),
[
new AmazonSqsReceivedStamp($messageId),
]), 'async');
$consumer = new SqsConsumer($busDriver, $bus, $serializer, 'async');
$event = new SqsEvent([
'Records' => [
Expand All @@ -53,11 +62,68 @@ public function testSerializer()
],
],
'eventSource'=>'aws:sqs',
'messageId' => 'e00c848c-2579-4f6a-a006-ccdc2808ed64',
'messageId' => $messageId,
],
],
]);

$consumer->handleSqs($event, new Context('', 0, '', ''));
}

public function testSerializerWithXRayHeader()
{
$busDriver = $this->getMockBuilder(BusDriver::class)
->disableOriginalConstructor()
->onlyMethods(['putEnvelopeOnBus'])
->getMock();

$bus = new MessageBus;
$serializer = $this->getMockBuilder(SerializerInterface::class)
->disableOriginalConstructor()
->onlyMethods(['encode', 'decode'])
->getMock();

$specialHeaders = ['Special\Header\Name' => 'some data'];

$headers = array_merge($specialHeaders, [
'Content-Type' => 'application/json',
]);
$body = 'Test message.';
$xrayTraceId = '709857d6-17c2-11ed-861d-0242ac120002';
$messageId = 'e00c848c-2579-4f6a-a006-ccdc2808ed64';
$serializer->expects($this->once())
->method('decode')
->with(['body' => $body, 'headers' => $headers])
->willReturn(new Envelope(new \stdClass));
$busDriver->expects($this->once())
->method('putEnvelopeOnBus')
->with($bus, new Envelope(
new \stdClass(),
[
new AmazonSqsReceivedStamp($messageId),
new AmazonSqsXrayTraceHeaderStamp($xrayTraceId)
]), 'async');
$consumer = new SqsConsumer($busDriver, $bus, $serializer, 'async');
$event = new SqsEvent([
'Records' => [
[
'body' => $body,
'messageAttributes' => [
'Content-Type' => [
'dataType' => 'String',
'stringValue' => 'application/json',
],
'X-Symfony-Messenger' => [
'dataType' => 'String',
'stringValue' => json_encode($specialHeaders),
],
],
'eventSource'=>'aws:sqs',
'messageId' => $messageId,
],
],
]);

$consumer->handleSqs($event, new Context('', 0, '', $xrayTraceId));
}
}

0 comments on commit 3f354cc

Please sign in to comment.