Skip to content

Commit

Permalink
Merge pull request #23 from trandangtri/feature/amount-consumed
Browse files Browse the repository at this point in the history
Add more argument: amount of messages to consume
  • Loading branch information
trandangtri authored Dec 12, 2017
2 parents ebdab6b + 33dccab commit 5ed9339
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 27 deletions.
15 changes: 8 additions & 7 deletions Command/QueueWorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use TriTran\SqsQueueBundle\Service\BaseQueue;
Expand All @@ -23,12 +24,8 @@ protected function configure()
{
$this
->setName('tritran:sqs_queue:worker')
->addArgument(
'name',
InputArgument::REQUIRED,
'Queue Name',
null
)
->addArgument('name', InputArgument::REQUIRED, 'Queue Name', null)
->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0)
->setDescription('Start a worker that will listen to a specified SQS queue');
}

Expand All @@ -41,6 +38,10 @@ protected function execute(InputInterface $input, OutputInterface $output)
if (!$this->getContainer()->has(sprintf('tritran.sqs_queue.%s', $queueName))) {
throw new \InvalidArgumentException(sprintf('Queue [%s] does not exist.', $queueName));
}
$amount = $input->getOption('messages');
if ($amount < 0) {
throw new \InvalidArgumentException("The -m option should be null or greater than 0");
}

$io = new SymfonyStyle($input, $output);
$io->title(sprintf('Start listening to queue <comment>%s</comment>', $queueName));
Expand All @@ -50,6 +51,6 @@ protected function execute(InputInterface $input, OutputInterface $output)

/** @var BaseWorker $worker */
$worker = $this->getContainer()->get('tritran.sqs_queue.queue_worker');
$worker->start($queue);
$worker->start($queue, $amount);
}
}
19 changes: 16 additions & 3 deletions Service/BaseWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,33 @@ class BaseWorker
{
use LoggerAwareTrait;

/**
* @var int
*/
private $consumed;

/**
* @param BaseQueue $queue
* @param int $amount
* @param int $limit Zero is all
*/
public function start(BaseQueue $queue, int $limit = 1)
public function start(BaseQueue $queue, int $amount = 0, int $limit = 1)
{
$this->consume($queue, $limit);
$this->consumed = 0;
$this->consume($queue, $amount, $limit);
}

/**
* @param BaseQueue $queue
* @param int $amount
* @param int $limit
*/
private function consume(BaseQueue $queue, int $limit = 1)
private function consume(BaseQueue $queue, int $amount = 0, int $limit = 1)
{
while (true) {
if ($amount && $this->consumed >= $amount) {
break;
}
$this->fetchMessage($queue, $limit);
}
}
Expand All @@ -45,6 +56,8 @@ private function fetchMessage(BaseQueue $queue, int $limit = 1)

$messages->rewind();
while ($messages->valid()) {
$this->consumed++;

/** @var Message $message */
$message = $messages->current();

Expand Down
22 changes: 10 additions & 12 deletions Tests/Functional/Command/QueueAttCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@ class QueueAttCommandTest extends KernelTestCase
*/
public function setUp()
{
if ($this->queueManager === null) {
$this->queueManager = $this->getMockBuilder(QueueManager::class)
->disableOriginalConstructor()
->getMock();
$this->queueManager
->expects($this->any())
->method('getQueueAttributes')
->with('my-queue-url')
->willReturn(['att1' => 'value1', 'att2' => 'value2']);

$this->getContainer()->set('tritran.sqs_queue.queue_manager', $this->queueManager);
}
$this->queueManager = $this->getMockBuilder(QueueManager::class)
->disableOriginalConstructor()
->getMock();
$this->queueManager
->expects($this->any())
->method('getQueueAttributes')
->with('my-queue-url')
->willReturn(['att1' => 'value1', 'att2' => 'value2']);

$this->getContainer()->set('tritran.sqs_queue.queue_manager', $this->queueManager);
}

/**
Expand Down
14 changes: 14 additions & 0 deletions Tests/Functional/Command/QueueWorkerCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ public function testExecuteWithNonExistingQueue()
]);
}

/**
* Test: start a worker with an invalid value of amount of messages
*/
public function testExecuteWithInvalidAmountMessages()
{
$commandTester = $this->createCommandTester(new QueueWorkerCommand());

$this->expectException(\InvalidArgumentException::class);
$commandTester->execute([
'name' => 'basic_queue',
'--messages' => -1
]);
}

/**
* Test: Start a worker for listening to a queue
*/
Expand Down
7 changes: 2 additions & 5 deletions Tests/app/KernelTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
namespace TriTran\SqsQueueBundle\Tests\app;

use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Bundle\FrameworkBundle\Console\Application;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase as SymfonyKernelTestCase;
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\DependencyInjection\ContainerInterface;

Expand Down Expand Up @@ -54,10 +54,7 @@ protected function getContainer($reinitialize = false, array $kernelOptions = []
*/
public function createCommandTester(ContainerAwareCommand $command)
{
$kernel = static::createKernel();
$kernel->boot();

$application = new Application($kernel);
$application = new Application();
$command->setContainer($this->getContainer());
$application->add($command);

Expand Down

0 comments on commit 5ed9339

Please sign in to comment.