From d50fd5beaf7163aa2e7c589008edc19ea951a539 Mon Sep 17 00:00:00 2001 From: yunwuxin <448901948@qq.com> Date: Tue, 12 Jul 2016 17:21:36 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4iron=E9=A9=B1=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 5 +- src/Queue.php | 15 ----- src/driver/Iron.php | 147 -------------------------------------------- src/job/Iron.php | 112 --------------------------------- 4 files changed, 1 insertion(+), 278 deletions(-) delete mode 100644 src/driver/Iron.php delete mode 100644 src/job/Iron.php diff --git a/README.md b/README.md index ac5ce67..26f81a0 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ ``` 'queue'=>[ - 'type'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,iron:Iron.io驱动 + 'type'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动 //或其他自定义的完整的类名 ] ``` @@ -17,9 +17,6 @@ > 各个驱动的具体可用配置项在`think\queue\driver`目录下各个驱动类里的`options`属性中,写在上面的`queue`配置里即可覆盖 -## 使用 Iron -> composer require iron-io/iron_mq 4.* - ## 使用 Database > 创建如下数据表 diff --git a/src/Queue.php b/src/Queue.php index f191b2a..9359c09 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -66,21 +66,6 @@ public static function marshal() self::handle()->marshal(); } - /** - * 订阅 - * @param $name - * @param $url - * @param $queue - * @param array $options - */ - public static function subscribe($name, $url, $queue, array $options = []) - { - if (!method_exists(self::handle(), 'subscribe')) - throw new \RuntimeException('subscribe queues not support for this type'); - - self::handle()->subscribe($name, $url, $queue, $options); - } - private static function handle() { $options = Config::get('queue'); diff --git a/src/driver/Iron.php b/src/driver/Iron.php deleted file mode 100644 index 4fb676d..0000000 --- a/src/driver/Iron.php +++ /dev/null @@ -1,147 +0,0 @@ - -// +---------------------------------------------------------------------- - -namespace think\queue\driver; - - -use Exception; -use IronMQ\IronMQ; -use think\queue\job\Iron as IronJob; -use think\Request; -use think\Response; - -class Iron -{ - /** @var IronMQ */ - protected $iron; - - protected $options = [ - 'token' => '', - 'project_id' => '', - 'protocol' => 'https', - 'host' => 'mq-aws-us-east-1-1.iron.io', - 'port' => '443', - 'api_version' => '3', - 'encryption_key' => '', - 'default' => 'default' - ]; - - /** @var Request */ - protected $request; - - public function __construct($options) - { - if (!empty($options)) { - $this->options = array_merge($this->options, $options); - } - - $this->iron = new IronMQ($this->options); - $this->request = Request::instance(); - - } - - public function push($job, $data = '', $queue = null) - { - return $this->pushRaw($this->createPayload($job, $data, $queue), $queue); - } - - public function later($delay, $job, $data = '', $queue = null) - { - $payload = $this->createPayload($job, $data, $queue); - - return $this->pushRaw($payload, $queue, compact('delay')); - } - - public function pop($queue = null) - { - $queue = $this->getQueue($queue); - - $job = $this->iron->reserveMessage($queue); - - if (!is_null($job)) { - return new IronJob($this, $job); - } - } - - public function recreate($payload, $queue, $delay) - { - return $this->pushRaw($payload, $queue, compact('delay')); - } - - - public function pushRaw($payload, $queue = null, array $options = []) - { - return $this->iron->postMessage($this->getQueue($queue), $payload, $options)->id; - } - - public function getQueue($queue) - { - return $queue ?: $this->options['default']; - } - - protected function createPayload($job, $data = '', $queue = null) - { - $payload = json_encode(['job' => $job, 'data' => $data]); - - $payload = $this->setMeta($payload, 'attempts', 1); - - return $this->setMeta($payload, 'queue', $this->getQueue($queue)); - } - - public function deleteMessage($queue, $id, $reservation_id) - { - $this->iron->deleteMessage($queue, $id, $reservation_id); - } - - public function marshal() - { - $this->createPushedIronJob($this->marshalPushedJob())->fire(); - - return new Response('OK'); - } - - public function subscribe($name, $url, $queue) - { - $this->iron->addSubscriber($queue, ['name' => $name, 'url' => $url]); - } - - /** - * Marshal out the pushed job and payload. - * - * @return object - */ - protected function marshalPushedJob() - { - return (object)[ - 'id' => $this->request->header('iron-message-id'), - 'body' => $this->request->getContent(), - 'reservation_id' => $this->request->header('iron-reservation-id') - ]; - } - - /** - * Create a new IronJob for a pushed job. - * - * @param object $job - * @return IronJob - */ - protected function createPushedIronJob($job) - { - return new IronJob($this, $job, true); - } - - protected function setMeta($payload, $key, $value) - { - $payload = json_decode($payload, true); - $payload[$key] = $value; - return json_encode($payload); - } -} \ No newline at end of file diff --git a/src/job/Iron.php b/src/job/Iron.php deleted file mode 100644 index b5a4233..0000000 --- a/src/job/Iron.php +++ /dev/null @@ -1,112 +0,0 @@ - -// +---------------------------------------------------------------------- - -namespace think\queue\job; - - -use think\queue\Job; -use think\queue\driver\Iron as IronQueue; - -class Iron extends Job -{ - - /** - * The Iron queue instance. - * - * @var IronQueue - */ - protected $iron; - - /** - * The IronMQ message instance. - * - * @var object - */ - protected $job; - - /** - * Indicates if the message was a push message. - * - * @var bool - */ - protected $pushed = false; - - public function __construct(IronQueue $iron, $job, $pushed = false) - { - $this->job = $job; - $this->iron = $iron; - $this->pushed = $pushed; - } - - /** - * Fire the job. - * @return void - */ - public function fire() - { - $this->resolveAndFire(json_decode($this->getRawBody(), true)); - } - - /** - * Get the number of times the job has been attempted. - * @return int - */ - public function attempts() - { - return json_decode($this->job->body, true)['attempts']; - } - - public function delete() - { - parent::delete(); - - if ($this->pushed) { - return; - } - - $this->iron->deleteMessage($this->getQueue(), $this->job->id, $this->job->reservation_id); - } - - public function release($delay = 0) - { - parent::release($delay); - - if (!$this->pushed) { - $this->delete(); - } - - $this->recreateJob($delay); - } - - protected function recreateJob($delay) - { - $payload = json_decode($this->job->body, true); - - $payload['attempts'] = $payload['attempts'] + 1; - - $this->iron->recreate(json_encode($payload), $this->getQueue(), $delay); - } - - - /** - * Get the raw body string for the job. - * @return string - */ - public function getRawBody() - { - return $this->job->body; - } - - public function getQueue() - { - return json_decode($this->job->body, true)['queue']; - } -} \ No newline at end of file