Skip to content

Commit

Permalink
AmqpQueueProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
TomK committed Jun 10, 2015
1 parent 88859ee commit 33d0c81
Show file tree
Hide file tree
Showing 8 changed files with 741 additions and 17 deletions.
30 changes: 18 additions & 12 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
{
"name": "packaged/queue",
"license": "MIT",
"authors": [
"name": "packaged/queue",
"license": "MIT",
"authors": [
{
"name": "Brooke Bryan",
"name": "Tom Kay",
"email": "[email protected]"
},
{
"name": "Brooke Bryan",
"email": "[email protected]"
}
],
"require": {
"php": ">=5.4.0"
"require": {
"php": ">=5.4.0",
"videlalvaro/php-amqplib": "~2.5",
"packaged/config": "~1.1"
},
"require-dev": {
"phpunit/phpunit": "3.7.*"
"require-dev": {
"phpunit/phpunit": "~4.7"
},
"autoload": {
"autoload": {
"psr-4": {
"Packaged\\Queue\\": "src"
"Packaged\\Queue\\": "src",
"Packaged\\Queue\\Tests\\": "tests"
}
},
"minimum-stability": "dev"
}
}
15 changes: 11 additions & 4 deletions phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@
bootstrap="vendor/autoload.php"
colors="true"
verbose="true"
strict="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false"
syntaxCheck="false"
>
syntaxCheck="false">
<testsuites>
<testsuite>
<directory>tests</directory>
</testsuite>
</testsuites>
</phpunit>
<filter>
<whitelist>
<directory suffix=".php">src</directory>
</whitelist>
<blacklist>
<directory suffix=".php">vendor</directory>
<directory suffix=".php">tests</directory>
</blacklist>
</filter>
</phpunit>
7 changes: 7 additions & 0 deletions src/IBatchQueueProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php
namespace Packaged\Queue;

interface IBatchQueueProvider extends IQueueProvider
{
public function pushBatch(array $batch);
}
8 changes: 8 additions & 0 deletions src/IDelayedBatchQueueProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php
namespace Packaged\Queue;

interface IDelayedBatchQueueProvider
extends IQueueProvider, IBatchQueueProvider, IDelayedQueueProvider
{
public function delayedPushBatch(array $batch, $delay);
}
4 changes: 3 additions & 1 deletion src/IQueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ interface IQueueProvider
{
public function push($data);

public function consume();
public function consume(callable $callback);

public function batchConsume(callable $callback, $batchSize);
}
103 changes: 103 additions & 0 deletions src/Provider/AbstractQueueProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?php
namespace Packaged\Queue\Provider;

use Packaged\Config\ConfigSectionInterface;
use Packaged\Config\ConfigurableInterface;
use Packaged\Config\Provider\ConfigSection;
use Packaged\Queue\IQueueProvider;

abstract class AbstractQueueProvider
implements IQueueProvider, ConfigurableInterface
{
protected $_config;
protected $_queueName;
protected $_consumerId;

protected $_batchData = [];

/**
* Configure the data connection
*
* @param ConfigSectionInterface $configuration
*
* @return static
*/
public function configure(ConfigSectionInterface $configuration)
{
$this->_config = $configuration;
return $this;
}

/**
* @return ConfigSectionInterface
*/
public function config()
{
if(!$this->_config)
{
$this->_config = new ConfigSection();
}
return $this->_config;
}

final protected function __construct()
{
$this->_construct();
}

protected function _construct()
{
}

/**
* @param $queueName
*
* @return static
*/
public static function create($queueName)
{
$object = new static;
$object->_queueName = $queueName;
return $object;
}

protected function _getQueueName()
{
return $this->_queueName;
}

protected function _getConsumerId()
{
if($this->_consumerId === null)
{
$this->_consumerId =
$this->_queueName . ':' . gethostname() . ':' . getmypid();
}
return $this->_consumerId;
}

public function batchConsume(callable $callback, $batchSize)
{
$this->_batchData = [];
while(count($this->_batchData) < $batchSize)
{
if(!$this->consume([$this, '_processBatchMessage']))
{
$this->_log('No more messages in the queue');
break;
}
}
$callback($this->_batchData);
$this->_batchData = [];
}

protected function _processBatchMessage($msg)
{
$this->_batchData[] = $msg;
}

protected function _log($message)
{
error_log('Queue (' . $this->_getQueueName() . '): ' . $message);
}
}
Loading

0 comments on commit 33d0c81

Please sign in to comment.