-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
143 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
<?php | ||
/** | ||
* - Start this consumer in one window by calling: php demo/basic_nack.php | ||
* - Then on a separate window publish a message like this: php demo/amqp_publisher.php good | ||
* that message should be "ack'ed" | ||
* - Then publish a message like this: php demo/amqp_publisher.php bad | ||
* that message should be "nack'ed" | ||
*/ | ||
include(__DIR__ . '/../config.php'); | ||
require_once __DIR__ . '/../vendor/autoload.php'; | ||
|
||
use PhpAmqpLib\Connection\AMQPStreamConnection; | ||
use PhpAmqpLib\Exchange\AMQPExchangeType; | ||
|
||
$exchange = 'router'; | ||
$queue = 'msgs'; | ||
$consumerTag = 'consumer'; | ||
|
||
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); | ||
$channel = $connection->channel(); | ||
|
||
$channel->queue_declare($queue, false, true, false, false); | ||
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); | ||
$channel->queue_bind($queue, $exchange); | ||
|
||
/** | ||
* @param \PhpAmqpLib\Message\AMQPMessage $message | ||
*/ | ||
function process_message($message) | ||
{ | ||
if ($message->body == 'good') { | ||
$message->ack(); | ||
} else { | ||
echo "成功收到消息,消息内容为:".$message->body ; | ||
echo "将消息打回,重回队列:"; | ||
$message->nack(true); | ||
} | ||
|
||
// Send a message with the string "quit" to cancel the consumer. | ||
if ($message->body === 'quit') { | ||
$message->getChannel()->basic_cancel($message->getConsumerTag()); | ||
} | ||
} | ||
|
||
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message'); | ||
|
||
/** | ||
* @param \PhpAmqpLib\Channel\AMQPChannel $channel | ||
* @param \PhpAmqpLib\Connection\AbstractConnection $connection | ||
*/ | ||
function shutdown($channel, $connection) | ||
{ | ||
$channel->close(); | ||
$connection->close(); | ||
} | ||
|
||
register_shutdown_function('shutdown', $channel, $connection); | ||
|
||
// Loop as long as the channel has callbacks registered | ||
while ($channel->is_consuming()) { | ||
$channel->wait(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# Requeue |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
<?php | ||
include(__DIR__ . '/../config.php'); | ||
use PhpAmqpLib\Wire\AMQPTable; | ||
use PhpAmqpLib\Message\AMQPMessage; | ||
use PhpAmqpLib\Exchange\AMQPExchangeType; | ||
use PhpAmqpLib\Connection\AMQPStreamConnection; | ||
|
||
/** | ||
* 死信队列测试 | ||
* 1、创建两个交换器 exchange.normal 和 exchange.dlx, 分别绑定两个队列 queue.normal 和 queue.dlx | ||
* 2、把 queue.normal 队列里面的消息配置过期时间,然后通过 x-dead-letter-exchange 指定死信交换器为 exchange.dlx | ||
* 3、发送消息到 queue.normal 中,消息过期之后流入 exchange.dlx,然后路由到 queue.dlx 队列中,进行消费 | ||
*/ | ||
|
||
// todo 更改配置 | ||
//$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/'); | ||
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); | ||
|
||
$channel = $connection->channel(); | ||
|
||
$channel->exchange_declare('exchange.dlx', AMQPExchangeType::DIRECT, false, true); | ||
$channel->exchange_declare('exchange.normal', AMQPExchangeType::FANOUT, false, true); | ||
$args = new AMQPTable(); | ||
// 消息过期方式:设置 queue.normal 队列中的消息10s之后过期 | ||
$args->set('x-message-ttl', 10000); | ||
// 设置队列最大长度方式: x-max-length | ||
//$args->set('x-max-length', 1); | ||
$args->set('x-dead-letter-exchange', 'exchange.dlx'); | ||
$args->set('x-dead-letter-routing-key', 'routingkey'); | ||
$channel->queue_declare('queue.normal', false, true, false, false, false, $args); | ||
$channel->queue_declare('queue.dlx', false, true, false, false); | ||
|
||
$channel->queue_bind('queue.normal', 'exchange.normal'); | ||
$channel->queue_bind('queue.dlx', 'exchange.dlx', 'routingkey'); | ||
$message = new AMQPMessage('Hello DLX Message'); | ||
$channel->basic_publish($message, 'exchange.normal', 'rk'); | ||
|
||
$channel->close(); | ||
$connection->close(); |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
<?php | ||
include(__DIR__ . '/config.php'); | ||
use PhpAmqpLib\Wire\AMQPTable; | ||
use PhpAmqpLib\Message\AMQPMessage; | ||
use PhpAmqpLib\Exchange\AMQPExchangeType; | ||
use PhpAmqpLib\Connection\AMQPStreamConnection; | ||
|
||
/** | ||
* 死信队列测试 | ||
* 1、创建两个交换器 exchange.normal 和 exchange.dlx, 分别绑定两个队列 queue.normal 和 queue.dlx | ||
* 2、把 queue.normal 队列里面的消息配置过期时间,然后通过 x-dead-letter-exchange 指定死信交换器为 exchange.dlx | ||
* 3、发送消息到 queue.normal 中,消息过期之后流入 exchange.dlx,然后路由到 queue.dlx 队列中,进行消费 | ||
*/ | ||
|
||
// todo 更改配置 | ||
//$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/'); | ||
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); | ||
|
||
$channel = $connection->channel(); | ||
|
||
$channel->exchange_declare('exchange.dlx', AMQPExchangeType::DIRECT, false, true); | ||
$channel->exchange_declare('exchange.normal', AMQPExchangeType::FANOUT, false, true); | ||
$args = new AMQPTable(); | ||
// 消息过期方式:设置 queue.normal 队列中的消息10s之后过期 | ||
$args->set('x-message-ttl', 10000); | ||
// 设置队列最大长度方式: x-max-length | ||
//$args->set('x-max-length', 1); | ||
$args->set('x-dead-letter-exchange', 'exchange.dlx'); | ||
$args->set('x-dead-letter-routing-key', 'routingkey'); | ||
$channel->queue_declare('queue.normal', false, true, false, false, false, $args); | ||
$channel->queue_declare('queue.dlx', false, true, false, false); | ||
|
||
$channel->queue_bind('queue.normal', 'exchange.normal'); | ||
$channel->queue_bind('queue.dlx', 'exchange.dlx', 'routingkey'); | ||
$message = new AMQPMessage('Hello DLX Message'); | ||
$channel->basic_publish($message, 'exchange.normal', 'rk'); | ||
|
||
$channel->close(); | ||
$connection->close(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# Dead-Letter-Exchange | ||
|