Skip to content

Commit

Permalink
Merge pull request #62 from BedrockStreaming/feat/drop-symfony32
Browse files Browse the repository at this point in the history
  • Loading branch information
Yokann authored Aug 8, 2023
2 parents a9e4a6e + 2d938a1 commit ffe058e
Show file tree
Hide file tree
Showing 23 changed files with 151 additions and 424 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
matrix:
php-version: ['7.4', '8.0', '8.1' ]
symfony-version: ['^3.4', '^4.4', '^5.0']
symfony-version: ['^4.4', '^5.0']
fail-fast: false
steps:
- uses: actions/checkout@master
Expand Down
10 changes: 5 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
"require" : {
"php": ">=7.4",
"ext-amqp": "*",
"symfony/dependency-injection": "^3.4 || ^4.3 || ^5.0",
"symfony/framework-bundle": "^3.4 || ^4.3 || ^5.0",
"symfony/http-kernel": "^3.4 || ^4.3 || ^5.0",
"symfony/yaml": "^3.4 || ^4.3 || ^5.0",
"twig/twig": "^1.31 || ^2.0 || ^3.0"
"symfony/dependency-injection": "^4.4 || ^5.0",
"symfony/framework-bundle": "^4.4 || ^5.0",
"symfony/http-kernel": "^4.4 || ^5.0",
"symfony/yaml": "^4.4 || ^5.0",
"twig/twig": "^2.13 || ^3.0"
},
"require-dev" : {
"atoum/atoum": "~4.0",
Expand Down
32 changes: 12 additions & 20 deletions src/AmqpBundle/Amqp/AbstractAmqp.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,30 @@

namespace M6Web\Bundle\AmqpBundle\Amqp;

use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

/**
* Abstract AMQP.
*/
abstract class AbstractAmqp
{
/**
* Event dispatcher.
*
* @var object
*/
protected $eventDispatcher = null;
protected ?EventDispatcherInterface $eventDispatcher = null;

/**
* Class of the event notifier.
*
* @var string
* @var ?class-string $eventClass
*/
protected $eventClass = null;
protected ?string $eventClass = null;

/**
* Notify an event to the event dispatcher.
*
* @param string $command The command name
* @param array $arguments Args of the command
* @param mixed $return Return value of the command
* @param int $time Exec time
* @param float $time Exec time
*/
protected function notifyEvent($command, $arguments, $return, $time = 0)
protected function notifyEvent(string $command, array $arguments, $return, float $time = 0)
{
if ($this->eventDispatcher) {
$event = new $this->eventClass();
Expand All @@ -38,7 +34,7 @@ protected function notifyEvent($command, $arguments, $return, $time = 0)
->setReturn($return)
->setExecutionTime($time);

$this->eventDispatcher->dispatch('amqp.command', $event);
$this->eventDispatcher->dispatch($event, 'amqp.command');
}
}

Expand All @@ -51,7 +47,7 @@ protected function notifyEvent($command, $arguments, $return, $time = 0)
*
* @return mixed
*/
protected function call($object, $name, array $arguments = [])
protected function call(object $object, string $name, array $arguments = [])
{
$start = microtime(true);

Expand All @@ -65,17 +61,13 @@ protected function call($object, $name, array $arguments = [])
/**
* Set an event dispatcher to notify amqp command.
*
* @param object $eventDispatcher The eventDispatcher object, which implement the notify method
* @param string $eventClass The event class used to create an event and send it to the event dispatcher
* @param EventDispatcherInterface $eventDispatcher The eventDispatcher object, which implement the notify method
* @param string $eventClass The event class used to create an event and send it to the event dispatcher
*
* @throws \Exception
*/
public function setEventDispatcher($eventDispatcher, $eventClass)
public function setEventDispatcher(EventDispatcherInterface $eventDispatcher, string $eventClass)
{
if (!is_object($eventDispatcher) || !method_exists($eventDispatcher, 'dispatch')) {
throw new Exception('The EventDispatcher must be an object and implement a dispatch method');
}

$class = new \ReflectionClass($eventClass);
if (!$class->implementsInterface('\M6Web\Bundle\AmqpBundle\Event\DispatcherInterface')) {
throw new Exception('The Event class : '.$eventClass.' must implement DispatcherInterface');
Expand Down
60 changes: 17 additions & 43 deletions src/AmqpBundle/Amqp/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,11 @@
*/
class Consumer extends AbstractAmqp
{
/**
* @var \AMQPQueue
*/
protected $queue = null;

/**
* @var array
*/
protected $queueOptions = [];
protected \AMQPQueue $queue;

protected array $queueOptions = [];

/**
* @param \AMQPQueue $queue Amqp Queue
* @param array $queueOptions Queue options
*/
public function __construct(\AMQPQueue $queue, array $queueOptions)
{
$this->queue = $queue;
Expand All @@ -37,18 +28,17 @@ public function __construct(\AMQPQueue $queue, array $queueOptions)
*
* @param int $flags MQP_AUTOACK or AMQP_NOPARAM
*
* @throws \AMQPChannelException if the channel is not open
* @throws \AMQPConnectionException if the connection to the broker was lost
*
* @return \AMQPEnvelope|bool
* @throws \AMQPChannelException if the channel is not open
*/
public function getMessage($flags = AMQP_AUTOACK)
public function getMessage(int $flags = AMQP_AUTOACK): ?\AMQPEnvelope
{
$envelope = $this->call($this->queue, 'get', [$flags]);
$envelope = $envelope === false ? null : $envelope;

if ($this->eventDispatcher) {
$preRetrieveEvent = new PreRetrieveEvent($envelope);
$this->eventDispatcher->dispatch(PreRetrieveEvent::NAME, $preRetrieveEvent);
$this->eventDispatcher->dispatch($preRetrieveEvent, PreRetrieveEvent::NAME);

return $preRetrieveEvent->getEnvelope();
}
Expand All @@ -62,17 +52,15 @@ public function getMessage($flags = AMQP_AUTOACK)
* @param string $deliveryTag delivery tag of last message to ack
* @param int $flags AMQP_MULTIPLE or AMQP_NOPARAM
*
* @return bool
*
* @throws \AMQPChannelException if the channel is not open
* @throws \AMQPConnectionException if the connection to the broker was lost
*/
public function ackMessage($deliveryTag, $flags = AMQP_NOPARAM)
public function ackMessage(string $deliveryTag, int $flags = AMQP_NOPARAM): bool
{
if ($this->eventDispatcher) {
$ackEvent = new AckEvent($deliveryTag, $flags);

$this->eventDispatcher->dispatch(AckEvent::NAME, $ackEvent);
$this->eventDispatcher->dispatch($ackEvent,AckEvent::NAME);
}

return $this->call($this->queue, 'ack', [$deliveryTag, $flags]);
Expand All @@ -84,17 +72,15 @@ public function ackMessage($deliveryTag, $flags = AMQP_NOPARAM)
* @param string $deliveryTag delivery tag of last message to nack
* @param int $flags AMQP_NOPARAM or AMQP_REQUEUE to requeue the message(s)
*
* @throws \AMQPChannelException if the channel is not open
* @throws \AMQPConnectionException if the connection to the broker was lost
*
* @return bool
* @throws \AMQPChannelException if the channel is not open
*/
public function nackMessage($deliveryTag, $flags = AMQP_NOPARAM)
public function nackMessage(string $deliveryTag, int $flags = AMQP_NOPARAM): bool
{
if ($this->eventDispatcher) {
$nackEvent = new NackEvent($deliveryTag, $flags);

$this->eventDispatcher->dispatch(NackEvent::NAME, $nackEvent);
$this->eventDispatcher->dispatch($nackEvent, NackEvent::NAME);
}

return $this->call($this->queue, 'nack', [$deliveryTag, $flags]);
Expand All @@ -105,26 +91,22 @@ public function nackMessage($deliveryTag, $flags = AMQP_NOPARAM)
*
* @throws \AMQPChannelException if the channel is not open
* @throws \AMQPConnectionException if the connection to the broker was lost
*
* @return bool
*/
public function purge()
public function purge(): bool
{
if ($this->eventDispatcher) {
$purgeEvent = new PurgeEvent($this->queue);

$this->eventDispatcher->dispatch(PurgeEvent::NAME, $purgeEvent);
$this->eventDispatcher->dispatch($purgeEvent, PurgeEvent::NAME);
}

return $this->call($this->queue, 'purge');
}

/**
* Get the current message count.
*
* @return int
*/
public function getCurrentMessageCount()
public function getCurrentMessageCount(): int
{
// Save the current queue flags and setup the queue in passive mode
$flags = $this->queue->getFlags();
Expand All @@ -139,20 +121,12 @@ public function getCurrentMessageCount()
return $messagesCount;
}

/**
* @return \AMQPQueue
*/
public function getQueue()
public function getQueue(): \AMQPQueue
{
return $this->queue;
}

/**
* @param \AMQPQueue $queue
*
* @return \M6Web\Bundle\AmqpBundle\Amqp\Consumer
*/
public function setQueue(\AMQPQueue $queue)
public function setQueue(\AMQPQueue $queue): Consumer
{
$this->queue = $queue;

Expand Down
22 changes: 7 additions & 15 deletions src/AmqpBundle/Amqp/DataCollector.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace M6Web\Bundle\AmqpBundle\Amqp;

use M6Web\Bundle\AmqpBundle\Event\DispatcherInterface;
use Symfony\Component\HttpKernel\DataCollector\DataCollector as SymfonyDataCollector;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpFoundation\Request;
Expand All @@ -11,11 +12,6 @@
*/
class DataCollector extends SymfonyDataCollector
{
/**
* @param string $name
*
* Construct the data collector
*/
public function __construct(string $name)
{
$this->data['name'] = $name;
Expand All @@ -25,20 +21,20 @@ public function __construct(string $name)
/**
* Collect the data.
*
* @param Request $request The request object
* @param Response $response The response object
* @param \Exception $exception An exception
* @param Request $request The request object
* @param Response $response The response object
* @param \Throwable|null $exception An exception
*/
public function collect(Request $request, Response $response, \Exception $exception = null)
public function collect(Request $request, Response $response, ?\Throwable $exception = null)
{
}

/**
* Listen for command event.
*
* @param object $event The event object
* @param DispatcherInterface $event The event object
*/
public function onCommand($event)
public function onCommand(DispatcherInterface $event)
{
$this->data['commands'][] = array(
'command' => $event->getCommand(),
Expand All @@ -59,8 +55,6 @@ public function getCommands(): array

/**
* Return the name of the collector.
*
* @return string data collector name
*/
public function getName(): string
{
Expand All @@ -69,8 +63,6 @@ public function getName(): string

/**
* Return total command execution time.
*
* @return float
*/
public function getTotalExecutionTime(): float
{
Expand Down
4 changes: 2 additions & 2 deletions src/AmqpBundle/Amqp/Locator.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
class Locator
{
/** @var Consumer[] */
protected $consumers = [];
protected array $consumers = [];

/** @var Producer[] */
protected $producers = [];
protected array $producers = [];

public function getConsumer(string $id): Consumer
{
Expand Down
Loading

0 comments on commit ffe058e

Please sign in to comment.