diff --git a/composer.json b/composer.json index 4602d60..647400e 100644 --- a/composer.json +++ b/composer.json @@ -1,22 +1,28 @@ { - "name": "packaged/queue", - "license": "MIT", - "authors": [ + "name": "packaged/queue", + "license": "MIT", + "authors": [ { - "name": "Brooke Bryan", + "name": "Tom Kay", + "email": "oridan82@gmail.com" + }, + { + "name": "Brooke Bryan", "email": "brooke@bajb.net" } ], - "require": { - "php": ">=5.4.0" + "require": { + "php": ">=5.4.0", + "videlalvaro/php-amqplib": "~2.5", + "packaged/config": "~1.1" }, - "require-dev": { - "phpunit/phpunit": "3.7.*" + "require-dev": { + "phpunit/phpunit": "~4.7" }, - "autoload": { + "autoload": { "psr-4": { - "Packaged\\Queue\\": "src" + "Packaged\\Queue\\": "src", + "Packaged\\Queue\\Tests\\": "tests" } - }, - "minimum-stability": "dev" + } } diff --git a/phpunit.xml b/phpunit.xml index b18bb74..1bbd68e 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -3,17 +3,24 @@ bootstrap="vendor/autoload.php" colors="true" verbose="true" - strict="true" convertErrorsToExceptions="true" convertNoticesToExceptions="true" convertWarningsToExceptions="true" processIsolation="false" stopOnFailure="false" - syntaxCheck="false" - > + syntaxCheck="false"> tests - \ No newline at end of file + + + src + + + vendor + tests + + + diff --git a/src/IBatchQueueProvider.php b/src/IBatchQueueProvider.php new file mode 100644 index 0000000..30e3059 --- /dev/null +++ b/src/IBatchQueueProvider.php @@ -0,0 +1,7 @@ +_config = $configuration; + return $this; + } + + /** + * @return ConfigSectionInterface + */ + public function config() + { + if(!$this->_config) + { + $this->_config = new ConfigSection(); + } + return $this->_config; + } + + final protected function __construct() + { + $this->_construct(); + } + + protected function _construct() + { + } + + /** + * @param $queueName + * + * @return static + */ + public static function create($queueName) + { + $object = new static; + $object->_queueName = $queueName; + return $object; + } + + protected function _getQueueName() + { + return $this->_queueName; + } + + protected function _getConsumerId() + { + if($this->_consumerId === null) + { + $this->_consumerId = + $this->_queueName . ':' . gethostname() . ':' . getmypid(); + } + return $this->_consumerId; + } + + public function batchConsume(callable $callback, $batchSize) + { + $this->_batchData = []; + while(count($this->_batchData) < $batchSize) + { + if(!$this->consume([$this, '_processBatchMessage'])) + { + $this->_log('No more messages in the queue'); + break; + } + } + $callback($this->_batchData); + $this->_batchData = []; + } + + protected function _processBatchMessage($msg) + { + $this->_batchData[] = $msg; + } + + protected function _log($message) + { + error_log('Queue (' . $this->_getQueueName() . '): ' . $message); + } +} diff --git a/src/Provider/Amqp/AmqpQueueProvider.php b/src/Provider/Amqp/AmqpQueueProvider.php new file mode 100644 index 0000000..81751f4 --- /dev/null +++ b/src/Provider/Amqp/AmqpQueueProvider.php @@ -0,0 +1,431 @@ +_fixedConsumerCallback = [$this, 'consumerCallback']; + } + + public function pushBatch(array $batch, $persistent = null) + { + $this->_refreshConnection(); + $channel = $this->_getChannel(); + $i = 0; + foreach($batch as $data) + { + $i++; + $channel->batch_basic_publish( + $this->_getMessage($data, $persistent), + $this->_getExchangeName(), + $this->_getRoutingKey() + ); + if($i % 100 === 0) + { + $channel->publish_batch(); + } + } + $channel->publish_batch(); + return $this; + } + + public function push($data, $persistent = null) + { + $this->_refreshConnection(); + $msg = $this->_getMessage($data, $persistent); + $this->_getChannel()->basic_publish( + $msg, + $this->_getExchangeName(), + $this->_getRoutingKey() + ); + return $this; + } + + public function consumerCallback(AMQPMessage $msg) + { + $callback = $this->_consumerCallback; + $callback( + json_decode($msg->body), + $msg->delivery_info['delivery_tag'] + ); + } + + public function consume(callable $callback) + { + $this->_consumerCallback = $callback; + $this->_refreshConnection(); + $channel = $this->_getChannel(); + $consumerId = $this->_getConsumerId(); + if(!isset($channel->callbacks[$consumerId])) + { + // register callback for this consumer + $channel->basic_consume( + $this->_getQueueName(), + $consumerId, + false, + false, + false, + false, + $this->_fixedConsumerCallback + ); + } + else + { + // replace callback for this consumer + $channel->callbacks[$consumerId] = $this->_fixedConsumerCallback; + } + try + { + $channel->wait(null, true, $this->_getWaitTime()); + } + catch(AMQPTimeoutException $e) + { + $this->_log('No message received in ' . $this->_getWaitTime() . 's'); + return false; + } + return true; + } + + public function batchConsume(callable $callback, $batchSize) + { + parent::batchConsume($callback, $batchSize); + } + + protected function _processBatchMessage($msg, $tag = null) + { + $this->_batchData[$tag] = $msg; + } + + protected function _getMessage($message, $persistent = null) + { + if($persistent === null) + { + $persistent = $this->_persistentDefault; + } + $persistent = $persistent ? 2 : 1; + if(!isset(self::$_messageCache[$persistent])) + { + self::$_messageCache[$persistent] = new AMQPMessage( + '', + [ + 'content_type' => 'application/json', + 'delivery_mode' => $persistent + ] + ); + self::$_messageCache[$persistent]->serialize_properties(); + } + $msg = clone self::$_messageCache[$persistent]; + $msg->setBody(json_encode($message)); + return $msg; + } + + protected function _getWaitTime() + { + if($this->_waitTime === null) + { + $this->_waitTime = $this->config()->getItem('wait_time', 30); + } + return $this->_waitTime; + } + + protected function _getRoutingKey() + { + return $this->_routingKey; + } + + protected function _getExchangeName() + { + return $this->_exchangeName; + } + + public function purge() + { + $this->_getChannel()->queue_purge($this->_getQueueName()); + return $this; + } + + public static function create( + $queueName, $exchangeName = null, $routingKey = null + ) + { + /** + * @var $object static + */ + $object = parent::create($queueName); + $object->_exchangeName = $exchangeName ?: $queueName; + $object->_routingKey = $routingKey ?: $queueName; + return $object; + } + + public function ack($deliveryTag) + { + $this->_getChannel()->basic_ack($deliveryTag, false); + } + + public function nack($deliveryTag, $requeueFailures = false) + { + $this->_getChannel()->basic_reject($deliveryTag, $requeueFailures); + } + + public function batchAck(array $tagResults, $requeueFailures = false) + { + $channel = $this->_getChannel(); + $lastTag = null; + // optimise ack/nack + if(count(array_filter($tagResults)) >= (count($tagResults) / 2)) + { + // more to ack than to nack, so reject individual ones and ack the rest + foreach($tagResults as $tag => $passed) + { + if(!$passed) + { + $this->nack($tag, $requeueFailures); + } + else + { + $lastTag = $tag; + } + } + if($lastTag) + { + $channel->basic_ack($lastTag, true); + } + } + else + { + // more to nack than to ack, so ack individual ones and nack the rest + foreach($tagResults as $tag => $passed) + { + if($passed) + { + $this->ack($tag); + } + else + { + $lastTag = $tag; + } + } + if($lastTag) + { + $channel->basic_nack($lastTag, true, $requeueFailures); + } + } + } + + /** + * Reconnect periodically for safety + * disconnect / reconnect after x time + */ + protected function _refreshConnection() + { + // check time of last connection + if((time() - $this->_lastConnectTime) >= $this->_reconnectInterval) + { + $this->_log('Connection refresh'); + $this->disconnect(); + } + } + + protected function _getHosts() + { + if(!$this->_hosts) + { + if((!$this->_hostsResetTime) + || (time() - $this->_hostsResetTime > $this->_hostsResetTimeMax) + ) + { + $this->_hostsRetries = $this->_hostsRetriesMax; + $this->_hostsResetTime = time(); + } + if($this->_hostsRetries) + { + $this->_hosts = (array)$this->config()->getItem('hosts', 'localhost'); + $this->_hostsRetries--; + } + else + { + throw new \Exception( + 'All hosts failed to connect ' . $this->_hostsRetriesMax . + ' times within ' . $this->_hostsResetTimeMax . ' seconds' + ); + } + } + shuffle($this->_hosts); + return $this->_hosts; + } + + /** + * @return AMQPStreamConnection + * @throws \Exception + */ + protected function _getConnection() + { + if($this->_connection === null) + { + while(!$this->_connection) + { + $this->_getHosts(); + $host = reset($this->_hosts); + try + { + $this->_connection = new AMQPStreamConnection( + $host, + $this->config()->getItem('port', 5672), + $this->config()->getItem('username', 'guest'), + $this->config()->getItem('password', 'guest') + ); + } + catch(\Exception $e) + { + $this->_log('AMQP host failed to connect (' . $host . ')'); + array_shift($this->_hosts); + } + $this->_persistentDefault = (bool)$this->config()->getItem( + 'persistent', + false + ); + $this->_lastConnectTime = time(); + } + } + return $this->_connection; + } + + /** + * @return AMQPChannel + */ + protected function _getChannel() + { + if($this->_channel === null) + { + $this->_channel = $this->_getConnection()->channel(); + } + return $this->_channel; + } + + public function __destruct() + { + $this->disconnect(); + } + + public function disconnect() + { + try + { + if($this->_channel !== null && $this->_channel instanceof AMQPChannel) + { + $this->_channel->close(); + } + } + catch(\Exception $e) + { + } + $this->_channel = null; + try + { + if($this->_connection !== null && $this->_connection instanceof AbstractConnection) + { + $this->_connection->close(); + } + } + catch(\Exception $e) + { + } + $this->_connection = null; + $this->_exchange = null; + } + + public function setPrefetch($count, $size = 0) + { + $this->_getChannel()->basic_qos($size, $count, false); + return $this; + } + + public function declareQueue() + { + $this->_getChannel()->queue_declare( + $this->_getQueueName(), + (bool)$this->config()->getItem('queue_passive', false), + (bool)$this->config()->getItem('queue_durable', true), + (bool)$this->config()->getItem('queue_exclusive', false), + (bool)$this->config()->getItem('queue_autodelete', false), + (bool)$this->config()->getItem('queue_nowait', false), + (array)$this->config()->getItem('queue_args', null) + ); + return $this; + } + + public function declareExchange() + { + $this->_getChannel()->exchange_declare( + $this->_getExchangeName(), + (string)$this->config()->getItem('exchange_type', 'direct'), + (bool)$this->config()->getItem('exchange_passive', false), + (bool)$this->config()->getItem('exchange_durable', true), + (bool)$this->config()->getItem('exchange_autodelete', false), + (bool)$this->config()->getItem('exchange_internal', false), + (bool)$this->config()->getItem('exchange_nowait', false), + (array)$this->config()->getItem('exchange_args', null) + ); + return $this; + } + + public function bindQueue() + { + $this->_getChannel()->queue_bind( + $this->_getQueueName(), + $this->_getExchangeName(), + $this->_getRoutingKey() + ); + return $this; + } +} diff --git a/tests/Provider/AmqpTest.php b/tests/Provider/AmqpTest.php new file mode 100644 index 0000000..a7ee208 --- /dev/null +++ b/tests/Provider/AmqpTest.php @@ -0,0 +1,160 @@ +declareExchange() + ->declareQueue() + ->bindQueue() + ->push('this is a test'); + + $q->consume( + function ($message, $deliveryTag) use ($q) + { + $this->assertEquals('this is a test', $message); + $q->ack($deliveryTag); + } + ); + } + + public function testBatchAck() + { + $q = AmqpQueueProvider::create('test.batch.ack') + ->declareExchange() + ->declareQueue() + ->bindQueue() + ->purge(); + $q->config()->addItem('wait_time', 1); + + $total = 1000; + + $data = []; + for($i = 0; $i < $total; $i++) + { + $data[] = 'message' . $i; + } + $q->pushBatch($data); + + $count = 0; + while(true) + { + $c = 0; + $q->batchConsume( + function (array $messages) use ($q, &$c) + { + $results = []; + foreach($messages as $tag => $message) + { + $c++; + $results[$tag] = true; + } + $q->batchAck($results); + }, + 250 + ); + $count += $c; + if(!$c) + { + break; + } + } + $this->assertEquals($total, $count); + } + + public function testBatchNack() + { + $q = AmqpQueueProvider::create('test.batch.nack') + ->declareExchange() + ->declareQueue() + ->bindQueue() + ->purge(); + $q->config()->addItem('wait_time', 1); + + $total = 1000; + + $data = []; + for($i = 0; $i < $total; $i++) + { + $data[] = 'message' . $i; + } + $q->pushBatch($data); + + $count = 0; + while(true) + { + $c = 0; + $q->batchConsume( + function (array $messages) use ($q, &$c) + { + $results = []; + foreach($messages as $tag => $message) + { + $c++; + $results[$tag] = false; + } + $q->batchAck($results); + }, + 250 + ); + $count += $c; + if(!$c) + { + break; + } + } + $this->assertEquals($total, $count); + } + + public function testRequeue() + { + $q = AmqpQueueProvider::create('test.batch.requeue') + ->declareExchange() + ->declareQueue() + ->bindQueue() + ->purge(); + $q->config()->addItem('wait_time', 1); + + $total = 250; + + $data = []; + for($i = 0; $i < $total; $i++) + { + $data[] = 'message' . $i; + } + $q->pushBatch($data); + + $q->batchConsume( + function (array $messages) use ($q) + { + $results = []; + foreach($messages as $tag => $message) + { + $results[$tag] = false; + } + $q->batchAck($results, true); + }, + 250 + ); + + $count = 0; + $q->batchConsume( + function (array $messages) use ($q, &$count) + { + $results = []; + foreach($messages as $tag => $message) + { + $count++; + $results[$tag] = true; + } + $q->batchAck($results); + }, + 250 + ); + $this->assertEquals($total, $count); + } +}