队列与交换机的绑定,不能是任意绑定了,而是要指定一个路由key 消息的发送方在 向 交换机发送消息时,也必须指定消息的 路由key。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的路由key进行判断, 只有队列的路由key与消息的 路由key完全一致,才会接收到消息
生产者
//获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//将通道声明为交换机 交换机名称 类型 direct表示路由模式
channel.exchangeDeclare("logs_direct","direct");
//指定路由key
String routincKey = "info";
/**
* 生产消息
* 1. 交换机名称
* 2. 队列名称
* 3. 传递消息额外设置
* 4. 具体消息
*/
for (int i = 1; i <= 10; i++) {
channel.basicPublish("logs_direct",routincKey,null,("direct模型基于route key:" + routincKey + "的消息" + i).getBytes());
}
RabbitMQUtils.close(connection,channel);
消费者 error
//获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//为通道绑定交换机 交换机名称 类型
channel.exchangeDeclare("logs_direct","direct");
//创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
//基于route key 绑定队列和路由key
channel.queueBind(queueName,"logs_direct","error");
/**
* 消费消息
* 1. 消费的队列
* 2. 开始消息的自动确认机制
* 3. 回调接口 重写该接口的handleDelivery方法 处理消息
*/
channel.basicConsume(queueName,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("error模块 消费的消息:"+ new String(body) );
//手动确认 参数一 消息标志 参数2 是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
消费者 info error warning
//获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//为通道绑定交换机 交换机名称 类型 fanout表示广播
channel.exchangeDeclare("logs_direct","direct");
//交换机绑定临时队列
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,"logs_direct","info");
channel.queueBind(queueName,"logs_direct","error");
channel.queueBind(queueName,"logs_direct","warning");
/**
* 消费消息
* 1. 消费的队列
* 2. 开始消息的自动确认机制
* 3. 回调接口 重写该接口的handleDelivery方法 处理消息
*/
channel.basicConsume(queueName,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("日志模块消费的消息:"+ new String(body) );
//手动确认 参数一 消息标志 参数2 是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
生产info路由key的消息 发现只匹配error路由key的消费者1并没有消费
生产error路由key的消息 发现两个消费者都消费了