Skip to content

Latest commit

 

History

History
211 lines (158 loc) · 6.67 KB

工作队列模型(任务队列).md

File metadata and controls

211 lines (158 loc) · 6.67 KB

任务模型

概念

​ 当消息处理比较耗时的时候,可能生产消息的速度大于消费的速度,长此以往,就会导致消息堆积,无法及时处理,此时可以使用任务模型,当多个消费者绑定一个队列,共同消费其中的消息。

image-20211122115837571

代码

  1. 生产者

    /**
     * 生产者
     */
    public class Provider {
    
        /**
         * 生产消息
         */
        @Test
        public void sendMessage() throws IOException, TimeoutException {
    
            //获取连接对象
            Connection connection = RabbitMQUtils.getConnection();
    
            //获取连接中通道
            Channel channel = connection.createChannel();
    
            /**
             * 通过通道声明队列
             * 队列名称 不存在自动创建
             * 是否持久化
             * 是否独占队列
             * 是否在消费完成后自动删除队列
             * 附加参数
             */
            channel.queueDeclare("work",true,false,false,null);
    
            /**
             * 生产消息
             * 1. 交换机名称
             * 2. 队列名称
             * 3. 传递消息额外设置
             * 4. 具体消息
             */
            for (int i = 0; i < 10; i++) {
                channel.basicPublish("","work",null,("我是工作模型中的消息" + i).getBytes());
            }
    
            RabbitMQUtils.close(connection,channel);
        }
    
    }
  2. 消费者1

    /**
     * 消费者1
     */
    public class Customer1 {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //获取连接对象
            Connection connection = RabbitMQUtils.getConnection();
    
            //获取连接中通道
            Channel channel = connection.createChannel();
    
            /**
             * 通道绑定队列
             * 队列名称 不存在自动创建
             * 是否持久化
             * 是否独占队列
             * 是否在消费完成后自动删除队列
             * 附加参数
             */
            channel.queueDeclare("work",true,false,false,null);
    
            /**
             * 消费消息
             * 1. 消费的队列
             * 2. 开始消息的自动确认机制
             * 3. 回调接口 重写该接口的handleDelivery方法 处理消息
             */
            channel.basicConsume("work",true,new DefaultConsumer(channel){
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1消费的消息:"+ new String(body) );
                }
            });
    
        }
    
    }
  3. 消费者2

    /**
     * 消费者2
     */
    public class Customer2 {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //获取连接对象
            Connection connection = RabbitMQUtils.getConnection();
    
            //获取连接中通道
            Channel channel = connection.createChannel();
    
            /**
             * 通道绑定队列
             * 队列名称 不存在自动创建
             * 是否持久化
             * 是否独占队列
             * 是否在消费完成后自动删除队列
             * 附加参数
             */
            channel.queueDeclare("work",true,false,false,null);
    
            /**
             * 消费消息
             * 1. 消费的队列
             * 2. 开始消息的自动确认机制
             * 3. 回调接口 重写该接口的handleDelivery方法 处理消息
             */
            channel.basicConsume("work",true,new DefaultConsumer(channel){
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2消费的消息:"+ new String(body) );
                }
            });
    
        }
    
    }
  4. 先启动连个消费者,然后启动生产者

    image-20211122142932726

    image-20211122142942182

官方总结

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询算法。

  1. 消息自动处理机制

​ 默认情况下,通道是自动确认的,拿到消息之后不论是否消费完毕就会向Rabbit MQ发送确认。这样的话,如上两个消费者有一个有延迟也是平均分配。我们希望有延迟的消费者少消费,没有延迟的消费者多消费。可以关闭其自动确认机制。完成消费之后手动确认即可。

消费者代码

 public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //获取连接中通道
        Channel channel = connection.createChannel();

        //每次消费一个消息
        channel.basicQos(1);

        /**
         * 通道绑定队列
         * 队列名称 不存在自动创建
         * 是否持久化
         * 是否独占队列
         * 是否在消费完成后自动删除队列
         * 附加参数
         */
        channel.queueDeclare("work",true,false,false,null);

        /**
         * 消费消息
         * 1. 消费的队列
         * 2. 自动确认机制
         * 3. 回调接口 重写该接口的handleDelivery方法 处理消息
         */
        channel.basicConsume("work",false,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2消费的消息:"+ new String(body) );
                //手动确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });

    }

此时,我们的消费者1加入sleep模拟延迟,消费者2 正常执行。这样的话,结果如下

image-20211122145143763

image-20211122145149910

即达到了我们想要的效果。能者多劳。