Skip to content

Commit

Permalink
增加Topthink驱动
Browse files Browse the repository at this point in the history
  • Loading branch information
yunwuxin committed Jul 12, 2016
1 parent 2816e3a commit e6d0771
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public static function later($delay, $job, $data = '', $queue = null)
*/
public static function pop($queue = null)
{
if (!method_exists(self::handle(), 'pop'))
throw new \RuntimeException('pop queues not support for this type');

return self::handle()->pop($queue);
}

Expand Down
231 changes: 231 additions & 0 deletions src/driver/Topthink.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <[email protected]>
// +----------------------------------------------------------------------

namespace think\queue\driver;


use think\exception\HttpException;
use think\Request;
use think\queue\job\Topthink as TopthinkJob;
use think\Response;

class Topthink
{
protected $options = [
'token' => '',
'project_id' => '',
'protocol' => 'http',
'host' => 'qns.topthink.com',
'port' => '80',
'api_version' => '3',
'max_retries' => 3,
'encryption_key' => '',
'default' => 'default'
];

/** @var Request */
protected $request;

protected $url;

protected $curl = null;

protected $last_status;

protected $headers = [];

public function __construct($options)
{
if (!empty($options)) {
$this->options = array_merge($this->options, $options);
}

$this->url = "{$this->options['protocol']}://{$this->options['host']}:{$this->options['port']}/v{$this->options['api_version']}/";

$this->headers['Authorization'] = "Bearer {$this->options['token']}";

$this->request = Request::instance();
}

public function push($job, $data = '', $queue = null)
{
return $this->pushRaw(0, $queue, $this->createPayload($job, $data));
}

public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushRaw($delay, $queue, $this->createPayload($job, $data));
}

public function release($queue, $job, $delay)
{
return $this->pushRaw($delay, $queue, $job->payload, $job->attempts);
}

public function marshal()
{
$job = new TopthinkJob($this, $this->marshalPushedJob(), $this->request->header('topthink-message-queue'));
if ($this->request->header('topthink-message-status') == 'success') {
$job->fire();
} else {
$job->failed();
}
return new Response('OK');
}

public function pushRaw($delay, $queue, $payload, $attempts = 0)
{
$queue_name = $this->getQueue($queue);
$queue = rawurlencode($queue_name);
$url = "projects/{$this->options['project_id']}/queue/{$queue}/message";
$message = [
'payload' => $payload,
'attempts' => $attempts,
'delay' => $delay
];

return $this->apiCall('POST', $url, $message)->id;
}

public function deleteMessage($queue, $id)
{
$queue = rawurlencode($queue);
$url = "projects/{$this->options['project_id']}/queue/{$queue}/message/{$id}";
return $this->apiCall('DELETE', $url);
}

protected function apiCall($type, $url, $params = [])
{
$url = "{$this->url}$url";

if ($this->curl == null) {
$this->curl = curl_init();
}

switch ($type = strtoupper($type)) {
case 'DELETE':
curl_setopt($this->curl, CURLOPT_URL, $url);
curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params));
break;
case 'PUT':
curl_setopt($this->curl, CURLOPT_URL, $url);
curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params));
break;
case 'POST':
curl_setopt($this->curl, CURLOPT_URL, $url);
curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
curl_setopt($this->curl, CURLOPT_POST, true);
curl_setopt($this->curl, CURLOPT_POSTFIELDS, $params);
break;
case 'GET':
curl_setopt($this->curl, CURLOPT_POSTFIELDS, null);
curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
curl_setopt($this->curl, CURLOPT_HTTPGET, true);
$url .= '?' . http_build_query($params);
curl_setopt($this->curl, CURLOPT_URL, $url);
break;
}

curl_setopt($this->curl, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($this->curl, CURLOPT_RETURNTRANSFER, true);

$headers = [];
foreach ($this->headers as $k => $v) {
if ($k == 'Connection') {
$v = 'Close';
}
$headers[] = "$k: $v";
}

curl_setopt($this->curl, CURLOPT_HTTPHEADER, $headers);
curl_setopt($this->curl, CURLOPT_CONNECTTIMEOUT, 10);

return $this->callWithRetries();
}

protected function callWithRetries()
{
for ($retry = 0; $retry < $this->options['max_retries']; $retry++) {
$out = curl_exec($this->curl);
if ($out === false) {
$this->reportHttpError(0, curl_error($this->curl));
}
$this->last_status = curl_getinfo($this->curl, CURLINFO_HTTP_CODE);

if ($this->last_status >= 200 && $this->last_status < 300) {
return self::jsonDecode($out);
} elseif ($this->last_status >= 500) {
self::waitRandomInterval($retry);
} else {
$this->reportHttpError($this->last_status, $out);
}
}
$this->reportHttpError($this->last_status, "Service unavailable");
return null;
}

protected static function jsonDecode($response)
{
$data = json_decode($response);

$json_error = json_last_error();
if ($json_error != JSON_ERROR_NONE) {
throw new \RuntimeException($json_error);
}

return $data;
}

protected static function waitRandomInterval($retry)
{
$max_delay = pow(4, $retry) * 100 * 1000;
usleep(rand(0, $max_delay));
}

protected function reportHttpError($status, $text)
{
throw new HttpException("http error: {$status} | {$text}", $status);
}

protected function createPayload($job, $data = '')
{
return json_encode(['job' => $job, 'data' => $data]);
}

/**
* Marshal out the pushed job and payload.
*
* @return object
*/
protected function marshalPushedJob()
{
return (object)[
'id' => $this->request->header('topthink-message-id'),
'payload' => $this->request->getContent(),
'attempts' => $this->request->header('topthink-message-attempts')
];
}

public function getQueue($queue)
{
return $queue ?: $this->options['default'];
}

public function __destruct()
{
if ($this->curl != null) {
curl_close($this->curl);
$this->curl = null;
}
}
}
86 changes: 86 additions & 0 deletions src/job/Topthink.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <[email protected]>
// +----------------------------------------------------------------------

namespace think\queue\job;


use think\queue\Job;
use think\queue\driver\Topthink as TopthinkQueue;

class Topthink extends Job
{

/**
* The Iron queue instance.
*
* @var TopthinkQueue
*/
protected $topthink;

/**
* The IronMQ message instance.
*
* @var object
*/
protected $job;

public function __construct(TopthinkQueue $topthink, $job, $queue)
{
$this->topthink = $topthink;
$this->job = $job;
$this->queue = $queue;
$this->job->attempts = $this->job->attempts + 1;
}

/**
* Fire the job.
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->job->payload, true));
}

/**
* Get the number of times the job has been attempted.
* @return int
*/
public function attempts()
{
return (int)$this->job->attempts;
}

public function delete()
{
parent::delete();

$this->topthink->deleteMessage($this->queue, $this->job->id);
}

public function release($delay = 0)
{
parent::release($delay);

$this->delete();

$this->topthink->release($this->queue, $this->job, $delay);
}

/**
* Get the raw body string for the job.
* @return string
*/
public function getRawBody()
{
return $this->job->payload;
}

}

0 comments on commit e6d0771

Please sign in to comment.