From 9a9d5e0385cdb98db99994fd69a6afe6ace99508 Mon Sep 17 00:00:00 2001 From: yunwuxin <448901948@qq.com> Date: Wed, 30 Nov 2016 18:50:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=AE=8C=E5=96=84=20?= =?UTF-8?q?=E6=8F=90=E5=8D=87Queue=E7=9A=84=E5=91=BD=E5=90=8D=E7=A9=BA?= =?UTF-8?q?=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 9 ++- composer.json | 15 +++- src/Queue.php | 84 ++++++-------------- src/common.php | 17 ++++ src/config.php | 11 +-- src/queue/CallQueuedHandler.php | 36 +++++++++ src/queue/Connector.php | 69 ++++++++++++++++ src/{ => queue}/Job.php | 0 src/{ => queue}/Listener.php | 0 src/{ => queue}/Queueable.php | 0 src/{ => queue}/ShouldQueue.php | 0 src/{ => queue}/Worker.php | 0 src/{ => queue}/command/Listen.php | 0 src/{ => queue}/command/Restart.php | 0 src/{ => queue}/command/Subscribe.php | 0 src/{ => queue}/command/Work.php | 0 src/{driver => queue/connector}/Database.php | 11 +-- src/{driver => queue/connector}/Redis.php | 29 +++---- src/{driver => queue/connector}/Sync.php | 9 +-- src/{driver => queue/connector}/Topthink.php | 19 ++--- src/{ => queue}/job/Database.php | 2 +- src/{ => queue}/job/Redis.php | 2 +- src/{ => queue}/job/Sync.php | 0 src/{ => queue}/job/Topthink.php | 2 +- 24 files changed, 195 insertions(+), 120 deletions(-) create mode 100644 src/common.php create mode 100644 src/queue/CallQueuedHandler.php create mode 100644 src/queue/Connector.php rename src/{ => queue}/Job.php (100%) rename src/{ => queue}/Listener.php (100%) rename src/{ => queue}/Queueable.php (100%) rename src/{ => queue}/ShouldQueue.php (100%) rename src/{ => queue}/Worker.php (100%) rename src/{ => queue}/command/Listen.php (100%) rename src/{ => queue}/command/Restart.php (100%) rename src/{ => queue}/command/Subscribe.php (100%) rename src/{ => queue}/command/Work.php (100%) rename src/{driver => queue/connector}/Database.php (96%) rename src/{driver => queue/connector}/Redis.php (93%) rename src/{driver => queue/connector}/Sync.php (88%) rename src/{driver => queue/connector}/Topthink.php (96%) rename src/{ => queue}/job/Database.php (97%) rename src/{ => queue}/job/Redis.php (97%) rename src/{ => queue}/job/Sync.php (100%) rename src/{ => queue}/job/Topthink.php (97%) diff --git a/README.md b/README.md index e52bd73..14ad461 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,18 @@ > composer require topthink/think-queue ## 配置 +> 配置文件位于 `application/extra/queue.php` ### 公共配置 ``` -'queue'=>[ - 'type'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动 +[ + 'connector'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动 //或其他自定义的完整的类名 ] ``` ### 驱动配置 -> 各个驱动的具体可用配置项在`think\queue\driver`目录下各个驱动类里的`options`属性中,写在上面的`queue`配置里即可覆盖 +> 各个驱动的具体可用配置项在`think\queue\connector`目录下各个驱动类里的`options`属性中,写在上面的`queue`配置里即可覆盖 ## 使用 Database @@ -108,7 +109,7 @@ class Job2{ ## 发布任务 -> `think\queue\Queue:push($job, $data = '', $queue = null)` 和 `think\queue\Queue::later($delay, $job, $data = '', $queue = null)` 两个方法,前者是立即执行,后者是在`$delay`秒后执行 +> `think\Queue:push($job, $data = '', $queue = null)` 和 `think\Queue::later($delay, $job, $data = '', $queue = null)` 两个方法,前者是立即执行,后者是在`$delay`秒后执行 `$job` 是任务名 单模块的,且命名空间是`app\job`的,比如上面的例子一,写`Job1`类名即可 diff --git a/composer.json b/composer.json index 1af4a18..d503a82 100644 --- a/composer.json +++ b/composer.json @@ -1,6 +1,7 @@ { "name": "topthink/think-queue", "description": "The ThinkPHP5 Queue Package", + "type": "think-extend", "authors": [ { "name": "yunwuxin", @@ -8,13 +9,21 @@ } ], "license": "Apache-2.0", - "minimum-stability": "dev", "autoload": { "psr-4": { - "think\\queue\\": "src" + "think\\": "src" }, "files": [ - "src/config.php" + "src/common.php" ] + }, + "require": { + "topthink/think-helper": "^1.0", + "topthink/think-installer": ">=1.0.10" + }, + "extra": { + "think-config": { + "queue": "src/config.php" + } } } diff --git a/src/Queue.php b/src/Queue.php index 9359c09..f5d6871 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -9,75 +9,41 @@ // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- -namespace think\queue; - -use think\Config; - +namespace think; + +use think\helper\Str; +use think\queue\Connector; + +/** + * Class Queue + * @package think\queue + * + * @method static push($job, $data = '', $queue = null) + * @method static later($delay, $job, $data = '', $queue = null) + * @method static pop($queue = null) + * @method static marshal() + */ class Queue { - protected static $instance = []; - - /** - * 添加任务到队列 - * @param $job - * @param string $data - * @param null $queue - */ - public static function push($job, $data = '', $queue = null) - { - self::handle()->push($job, $data, $queue); - - } + /** @var Connector */ + protected static $connector; - /** - * 添加延迟任务到队列 - * @param $delay - * @param $job - * @param string $data - * @param null $queue - */ - public static function later($delay, $job, $data = '', $queue = null) - { - self::handle()->later($delay, $job, $data, $queue); - } - - /** - * 获取第一个任务 - * @param null $queue - * @return mixed - */ - public static function pop($queue = null) - { - if (!method_exists(self::handle(), 'pop')) - throw new \RuntimeException('pop queues not support for this type'); - - return self::handle()->pop($queue); - } - - - /** - * 由订阅的推送执行任务 - */ - public static function marshal() - { - if (!method_exists(self::handle(), 'marshal')) - throw new \RuntimeException('push queues not support for this type'); - - self::handle()->marshal(); - } - - private static function handle() + private static function buildConnector() { $options = Config::get('queue'); $type = !empty($options['type']) ? $options['type'] : 'Sync'; - if (!isset(self::$instance[$type])) { + if (!isset(self::$connector)) { - $class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\driver\\' . ucwords($type); + $class = false !== strpos($type, '\\') ? $type : '\\think\\queue\\connector\\' . Str::studly($type); - self::$instance[$type] = new $class($options); + self::$connector = new $class($options); } - return self::$instance[$type]; + return self::$connector; } + public static function __callStatic($name, $arguments) + { + return call_user_func_array([self::buildConnector(), $name], $arguments); + } } \ No newline at end of file diff --git a/src/common.php b/src/common.php new file mode 100644 index 0000000..aa41378 --- /dev/null +++ b/src/common.php @@ -0,0 +1,17 @@ + +// +---------------------------------------------------------------------- + +\think\Console::addDefaultCommands([ + "think\\queue\\command\\Work", + "think\\queue\\command\\Restart", + "think\\queue\\command\\Listen", + "think\\queue\\command\\Subscribe" +]); \ No newline at end of file diff --git a/src/config.php b/src/config.php index aa41378..41fd544 100644 --- a/src/config.php +++ b/src/config.php @@ -2,16 +2,13 @@ // +---------------------------------------------------------------------- // | ThinkPHP [ WE CAN DO IT JUST THINK IT ] // +---------------------------------------------------------------------- -// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved. +// | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved. // +---------------------------------------------------------------------- // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 ) // +---------------------------------------------------------------------- // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- -\think\Console::addDefaultCommands([ - "think\\queue\\command\\Work", - "think\\queue\\command\\Restart", - "think\\queue\\command\\Listen", - "think\\queue\\command\\Subscribe" -]); \ No newline at end of file +return [ + 'connector' => 'Sync' +]; \ No newline at end of file diff --git a/src/queue/CallQueuedHandler.php b/src/queue/CallQueuedHandler.php new file mode 100644 index 0000000..101e3bd --- /dev/null +++ b/src/queue/CallQueuedHandler.php @@ -0,0 +1,36 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue; + +class CallQueuedHandler +{ + + public function call(Job $job, array $data) + { + $command = unserialize($data['command']); + + call_user_func([$command, 'handle']); + + if (!$job->isDeletedOrReleased()) { + $job->delete(); + } + } + + public function failed(array $data, $e) + { + $command = unserialize($data['command']); + + if (method_exists($command, 'failed')) { + $command->failed($e); + } + } +} \ No newline at end of file diff --git a/src/queue/Connector.php b/src/queue/Connector.php new file mode 100644 index 0000000..4295758 --- /dev/null +++ b/src/queue/Connector.php @@ -0,0 +1,69 @@ + +// +---------------------------------------------------------------------- + +namespace think\queue; + +use InvalidArgumentException; + +abstract class Connector +{ + protected $options = []; + + abstract function push($job, $data = '', $queue = null); + + abstract function later($delay, $job, $data = '', $queue = null); + + abstract public function pop($queue = null); + + public function marshal() + { + throw new \RuntimeException('pop queues not support for this type'); + } + + protected function createPayload($job, $data = '', $queue = null) + { + if (is_object($job)) { + $payload = json_encode([ + 'job' => 'think\queue\CallQueuedHandler@call', + 'data' => [ + 'commandName' => get_class($job), + 'command' => serialize(clone $job), + ], + ]); + } else { + $payload = json_encode($this->createPlainPayload($job, $data)); + } + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg()); + } + + return $payload; + } + + protected function createPlainPayload($job, $data) + { + return ['job' => $job, 'data' => $data]; + } + + protected function setMeta($payload, $key, $value) + { + $payload = json_decode($payload, true); + $payload[$key] = $value; + $payload = json_encode($payload); + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg()); + } + + return $payload; + } +} \ No newline at end of file diff --git a/src/Job.php b/src/queue/Job.php similarity index 100% rename from src/Job.php rename to src/queue/Job.php diff --git a/src/Listener.php b/src/queue/Listener.php similarity index 100% rename from src/Listener.php rename to src/queue/Listener.php diff --git a/src/Queueable.php b/src/queue/Queueable.php similarity index 100% rename from src/Queueable.php rename to src/queue/Queueable.php diff --git a/src/ShouldQueue.php b/src/queue/ShouldQueue.php similarity index 100% rename from src/ShouldQueue.php rename to src/queue/ShouldQueue.php diff --git a/src/Worker.php b/src/queue/Worker.php similarity index 100% rename from src/Worker.php rename to src/queue/Worker.php diff --git a/src/command/Listen.php b/src/queue/command/Listen.php similarity index 100% rename from src/command/Listen.php rename to src/queue/command/Listen.php diff --git a/src/command/Restart.php b/src/queue/command/Restart.php similarity index 100% rename from src/command/Restart.php rename to src/queue/command/Restart.php diff --git a/src/command/Subscribe.php b/src/queue/command/Subscribe.php similarity index 100% rename from src/command/Subscribe.php rename to src/queue/command/Subscribe.php diff --git a/src/command/Work.php b/src/queue/command/Work.php similarity index 100% rename from src/command/Work.php rename to src/queue/command/Work.php diff --git a/src/driver/Database.php b/src/queue/connector/Database.php similarity index 96% rename from src/driver/Database.php rename to src/queue/connector/Database.php index 0c26d5f..28fb963 100644 --- a/src/driver/Database.php +++ b/src/queue/connector/Database.php @@ -9,12 +9,13 @@ // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- -namespace think\queue\driver; +namespace think\queue\connector; use think\Db; +use think\queue\Connector; use think\queue\job\Database as DatabaseJob; -class Database +class Database extends Connector { protected $db; @@ -44,12 +45,6 @@ public function later($delay, $job, $data = '', $queue = null) return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data)); } - - protected function createPayload($job, $data) - { - return json_encode(['job' => $job, 'data' => $data]); - } - public function pop($queue = null) { $queue = $this->getQueue($queue); diff --git a/src/driver/Redis.php b/src/queue/connector/Redis.php similarity index 93% rename from src/driver/Redis.php rename to src/queue/connector/Redis.php index 86f0a89..2c5a2a1 100644 --- a/src/driver/Redis.php +++ b/src/queue/connector/Redis.php @@ -9,13 +9,14 @@ // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- -namespace think\queue\driver; - +namespace think\queue\connector; use Exception; +use think\helper\Str; +use think\queue\Connector; use think\queue\job\Redis as RedisJob; -class Redis +class Redis extends Connector { /** @var \Redis */ protected $redis; @@ -47,7 +48,7 @@ public function __construct($options) if ('' != $this->options['password']) { $this->redis->auth($this->options['password']); } - + if (0 != $this->options['select']) { $this->redis->select($this->options['select']); } @@ -107,12 +108,11 @@ public function pushRaw($payload, $queue = null) return json_decode($payload, true)['id']; } - - protected function createPayload($job, $data) + protected function createPayload($job, $data = '', $queue = null) { - $payload = json_encode(['job' => $job, 'data' => $data]); - - $payload = $this->setMeta($payload, 'id', $this->getRandomId()); + $payload = $this->setMeta( + parent::createPayload($job, $data), 'id', $this->getRandomId() + ); return $this->setMeta($payload, 'attempts', 1); } @@ -182,7 +182,6 @@ protected function transaction(\Closure $closure) } } - /** * 获取所有到期任务 * @@ -195,7 +194,6 @@ protected function getExpiredJobs($from, $time) return $this->redis->zRangeByScore($from, '-inf', $time); } - /** * 删除过期任务 * @@ -233,14 +231,7 @@ protected function pushExpiredJobsOntoNewQueue($to, $jobs, $attempt = true) */ protected function getRandomId() { - return uniqid(); - } - - protected function setMeta($payload, $key, $value) - { - $payload = json_decode($payload, true); - $payload[$key] = $value; - return json_encode($payload); + return Str::random(32); } /** diff --git a/src/driver/Sync.php b/src/queue/connector/Sync.php similarity index 88% rename from src/driver/Sync.php rename to src/queue/connector/Sync.php index aa721e5..6db4c2c 100644 --- a/src/driver/Sync.php +++ b/src/queue/connector/Sync.php @@ -9,13 +9,14 @@ // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- -namespace think\queue\driver; +namespace think\queue\connector; use Exception; +use think\queue\Connector; use think\queue\job\Sync as SyncJob; use Throwable; -class Sync +class Sync extends Connector { public function push($job, $data = '', $queue = null) @@ -53,8 +54,4 @@ protected function resolveJob($payload) return new SyncJob($payload); } - protected function createPayload($job, $data = '', $queue = null) - { - return json_encode(['job' => $job, 'data' => $data]); - } } \ No newline at end of file diff --git a/src/driver/Topthink.php b/src/queue/connector/Topthink.php similarity index 96% rename from src/driver/Topthink.php rename to src/queue/connector/Topthink.php index 1f417d2..b4a1c4c 100644 --- a/src/driver/Topthink.php +++ b/src/queue/connector/Topthink.php @@ -9,14 +9,15 @@ // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- -namespace think\queue\driver; +namespace think\queue\connector; use think\exception\HttpException; +use think\queue\Connector; use think\Request; use think\queue\job\Topthink as TopthinkJob; use think\Response; -class Topthink +class Topthink extends Connector { protected $options = [ 'token' => '', @@ -195,11 +196,6 @@ protected function reportHttpError($status, $text) throw new HttpException($status, "http error: {$status} | {$text}"); } - protected function createPayload($job, $data = '') - { - return json_encode(['job' => $job, 'data' => $data]); - } - /** * Marshal out the pushed job and payload. * @@ -214,10 +210,6 @@ protected function marshalPushedJob() ]; } - public function getQueue($queue) - { - return $queue ?: $this->options['default']; - } public function __destruct() { @@ -226,4 +218,9 @@ public function __destruct() $this->curl = null; } } + + public function pop($queue = null) + { + throw new \RuntimeException('pop queues not support for this type'); + } } \ No newline at end of file diff --git a/src/job/Database.php b/src/queue/job/Database.php similarity index 97% rename from src/job/Database.php rename to src/queue/job/Database.php index e939a2f..ab6107d 100644 --- a/src/job/Database.php +++ b/src/queue/job/Database.php @@ -11,7 +11,7 @@ namespace think\queue\job; use think\queue\Job; -use think\queue\driver\Database as DatabaseQueue; +use think\queue\connector\Database as DatabaseQueue; class Database extends Job { diff --git a/src/job/Redis.php b/src/queue/job/Redis.php similarity index 97% rename from src/job/Redis.php rename to src/queue/job/Redis.php index e78af05..d56d2b6 100644 --- a/src/job/Redis.php +++ b/src/queue/job/Redis.php @@ -13,7 +13,7 @@ use think\queue\Job; -use think\queue\driver\Redis as RedisQueue; +use think\queue\connector\Redis as RedisQueue; class Redis extends Job { diff --git a/src/job/Sync.php b/src/queue/job/Sync.php similarity index 100% rename from src/job/Sync.php rename to src/queue/job/Sync.php diff --git a/src/job/Topthink.php b/src/queue/job/Topthink.php similarity index 97% rename from src/job/Topthink.php rename to src/queue/job/Topthink.php index 5106190..bab4cf1 100644 --- a/src/job/Topthink.php +++ b/src/queue/job/Topthink.php @@ -13,7 +13,7 @@ use think\queue\Job; -use think\queue\driver\Topthink as TopthinkQueue; +use think\queue\connector\Topthink as TopthinkQueue; class Topthink extends Job {