【RabbitMQ】死信队列、延迟队列

news/2024/9/21 1:51:11 标签: RabbitMQ, 死信队列, 延迟队列

死信队列

死信,简单理解就是因为种种原因,无法被消费的消息。

有死信,自然就有死信队列。当一个消息在一个队列中变成死信消息之后,就会被重新发送到另一个交换器中,这个交换器就是DLX(Dead Letter Exchange),绑定该交换器的队列,就被称为死信队列DLQ(Dead Letter Queue)。

消息变成死信消息一般是由于以下几条:

  • 队列达到最大长度
  • 消息过期
  • 消息被拒绝,即消息确认机中手动确认的两种拒绝情况,并且不允许重新入队

队列达到最大长度

spring:
  rabbitmq:
    host: 43.138.108.125
    port: 5672
    username: admin
    password: admin
    virtual-host: mq-springboot-test
@Configuration
public class DeadConfig {

    // 正常队列,当正常队列中的消息出现一些不确定情况时,消息就会进入死信交换机中

    @Bean("normalQueue")
    public Queue normalQueue() {
        return QueueBuilder.durable(Constants.NORMAL_QUEUE)
                .deadLetterExchange(Constants.DEAD_EXCHANGE) // 设置死信交换机
                .deadLetterRoutingKey("dead") // 设置死信队列的路由键为dead
                .maxLength(3) // 设置队列的最大长度为3
                .build();
    }

    @Bean("normalExchange")
    public Exchange normalExchange() {
        return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).durable(true).build();
    }

    @Bean("normalQueueBind")
    public Binding normalQueueBind(@Qualifier("normalQueue") Queue queue,
                                   @Qualifier("normalExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
    }

    // 死信队列

    @Bean("deadQueue")
    public Queue deadQueue() {
        return QueueBuilder.durable(Constants.DEAD_QUEUE).build();
    }

    @Bean("deadExchange")
    public Exchange deadExchange() {
        return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();
    }

    @Bean("deadQueueBind")
    public Binding deadQueueBind(@Qualifier("deadQueue") Queue queue,
                                 @Qualifier("deadExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("dead").noargs();
    }

}
@RestController
@RequestMapping("/dead")
public class DeadController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void deadQueue() {
        this.rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "hello 死信");
        System.out.println("正常队列发送消息成功");
    }

}
@Configuration
public class DeadListener {

    @RabbitListener(queues = Constants.DEAD_QUEUE)
    public void deadListener(String msg) {
        System.out.println("死信队列接收到消息:" + msg);
    }

}

在上述代码中,主要内容是声明了正常队列、交换机和绑定关系以及声明死信队列、死信交换机以及其绑定关系、正常队列的生产者代码、死亡队列的消费者代码。

队列达到最大长度和死信消息要转发到的DLX和路由键都是由正常队列在声明时进行绑定的。

启动上述程序之后,当正常队列存在三条消息之时,假设再来消息,那么消息就要进入死信交换机,从而路由到死信队列了。如下图可以看出,当发送第四条消息之后,死信队列的消费者就消费了一条消息:

在上述图片中,D表示队列是持久化的,Lim表示队列有最大长度,DLX表示队列存在死信交换机、DLK表示队列存在路由键。把鼠标放在这些字母上方,详细的消息都会表示。

在下述代码中,主要是对上述代码改进之后地方的指出,并没有把所有的代码全部给出。

消息过期

消息过期分为两种,一种是设置队列过期时间让消息过期,另一种是设置消息过期时间让消息过期,都可以进行测试。

设置队列过期时间

    @Bean("normalQueue")
    public Queue normalQueue() {
        return QueueBuilder.durable(Constants.NORMAL_QUEUE)
                .deadLetterExchange(Constants.DEAD_EXCHANGE) // 设置死信交换机
                .deadLetterRoutingKey("dead") // 设置死信队列的路由键为dead
//                .maxLength(3) // 设置队列的最大长度为3
                .ttl(5 * 1000) // 设置队列的过期时间为5秒
                .build();
    }

 由上图以及结合代码可以看出,将消息由正常生产者发送给Broker之后,大概5秒钟之后,消息过期。此时消息就会发送给死信交换机,从而交给其对应的消费者消费。

设置消息的过期时间

@Slf4j
@RestController
@RequestMapping("/dead")
public class DeadController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void deadQueue() {
        // 设置消息的过期时间
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                return message;
            }
        };
        this.rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "hello 死信", messagePostProcessor);
        log.info("死信队列发送成功");
    }

}

同样,结合上图和代码来说,19秒的时候消息发送功,24秒的时候死信消费者消费消息成功。

消息被拒绝

spring:
  rabbitmq:
    host: 43.138.108.125
    port: 5672
    username: admin
    password: admin
    virtual-host: mq-springboot-test
    listener:
      simple:
        acknowledge-mode: manual # 消息确认机制,手动确认
@Slf4j
@Configuration
public class DeadListener {

    // 正常队列接收消息
    @RabbitListener(queues = Constants.NORMAL_QUEUE)
    public void normalListener(Channel channel, String msg, Message message) throws IOException {
        try {
            log.info("正常队列监听器接收消息:{}", msg);
            int num = 3 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        } catch (Exception e) {
            log.error("正常队列监听器接收消息异常:{}", e.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    // 死信队列接收消息
    @RabbitListener(queues = Constants.DEAD_QUEUE)
    public void deadListener(String msg, Channel channel, Message message) throws IOException {
        try {
            log.info("死信队列监听器接收消息:{}", msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        } catch (Exception e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

}

由上图以及代码可以看到,当消息的确认机制是手动确认时,当出现异常并且拒绝消息重新入队以后,消息就会来到死信队列中。

使用场景

用户支付订单之后,支付系统会给订单系统返回当前订单的支付状态。为了保证支付信息不丢失,需要使用到死信队列机制。当消息消费异常时,将消息投入到死信队列,由订单系统的其他消费者来监听这个队列,并对数据进行处理(比如发送工单等,进行人工确认)。

消息重试:将死信消息发送到原队列或另一个队列进行重试处理。

消息丢弃:直接丢弃这些无法处理的消息,避免占用系统资源。

日志收集:将死信消息做为日志收集起来,用户后续分析和问题定位。

延迟队列

概念

延迟队列就是消息发送之后,并不想让消费者立即拿到消息,而是在等待特定时间之后,消费者才能拿到消息进行消费

应用场景

  1. 用户发起退款后,24小时内商家未处理,默认退款
  2. 用户注册成功后,三天后发送短信,提高用户活跃度
  3. 预定会议后,在会议开始前15分钟提醒众人参加会议
  4. 用户通过手机远程遥控家里的智能设备在指定时间进行工作,这就可以使用延迟队列。用户发送消息到延迟队列,当指定时间到了再将指令推送到智能设备。

实现方法

  1. RabbitMQ本身并没有实现延迟队列,因此可以使用TTL + 死信队列的方式来实现延迟队列
  2. 安装延迟队列插件来实现延迟队列

TTL + 死信队列

@Configuration
public class MockDelayConfig {

    @Bean("mockDelayNormalQueue")
    public Queue mockDelayNormalQueue() {
        return QueueBuilder.durable(Constants.MOCk_DELAY_NORMAL_QUEUE)
                .ttl(5000 * 10) // 设置消息过期时间为50秒
                .deadLetterExchange(Constants.MOCK_DELAY_DEAD_EXCHANGE) // 设置死信交换机
                .deadLetterRoutingKey("mock.delay.dead") // 设置死信路由键
                .build();
    }

    @Bean("mockDelayNormalExchange")
    public Exchange mockDelayNormalExchange() {
        return ExchangeBuilder.directExchange(Constants.MOCk_DELAY_NORMAL_EXCHANGE).durable(true).build();
    }

    @Bean("mockDelayNormalQueueBind")
    public Binding mockDelayNormalQueueBind(@Qualifier("mockDelayNormalQueue") Queue queue,
                                           @Qualifier("mockDelayNormalExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("mock.delay.normal").noargs();
    }

    @Bean("mockDelayDeadQueue")
    public Queue mockDelayDeadQueue() {
        return QueueBuilder.durable(Constants.MOCK_DELAY_DEAD_QUEUE).build();
    }

    @Bean("mockDelayDeadExchange")
    public Exchange mockDelayDeadExchange() {
        return ExchangeBuilder.directExchange(Constants.MOCK_DELAY_DEAD_EXCHANGE).durable(true).build();
    }

    @Bean("mockDelayDeadQueueBind")
    public Binding mockDelayDeadQueueBind(@Qualifier("mockDelayDeadQueue") Queue queue,
                                         @Qualifier("mockDelayDeadExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("mock.delay.dead").noargs();
    }

}
@Slf4j
@RestController
@RequestMapping("/mockDelay")
public class MockDelayController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void mockDelayQueue() {
        this.rabbitTemplate.convertAndSend(Constants.MOCk_DELAY_NORMAL_EXCHANGE,
                "mock.delay.normal", "hello 延迟队列");
        log.info("延迟队列生产者发送成功");
    }

}
@Slf4j
@Configuration
public class MockDelayListener {

    @RabbitListener(queues = Constants.MOCK_DELAY_DEAD_QUEUE)
    public void mockDelayListener(String msg) {
        log.info("模拟延迟队列消费者接收到消息:" + msg);
    }

}

在上述代码中,实现的功能是生产者发送消息后,消费者在50秒之后获得消息,对消息进行消费:

在TTL一文中,已经说明了RabbitMQ只会检查队首消息是否过期,不会扫描整个队列。因此如果想要放在模拟延迟队列中的消息过期时间不一致,那就会出现死信消息无法被及时处理的情况。因此,我们想要模拟实现延迟队列,就要确保队列中所有消息的过期时间是一致的。如果存在时间不一致的情况,我们就可以使用不同的模拟延迟队列来实现。

延迟队列插件

下载插件:官方网站进行下载(注意版本对应关系)

启动插件

rabbitma-plusins list // 查看插件列表

rabbitmq-plugins enable rabbitmq_delayed_message_exchange // 启动插件

service rabbitmq-server restart # 重启服务

如下图,当交换机中有了x-delayed-message就表示延迟插件安装成功 

代码测试

@Configuration
public class DelayConfig {

    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
    }

    @Bean("delayExchange")
    public Exchange delayExchange() {
        return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE)
                .delayed() // 延迟交换机
                .durable(true) // 持久化
                .build();
    }

    @Bean("delayQueueBind")
    public Binding delayQueueBind(@Qualifier("delayQueue") Queue delayQueue,
                                   @Qualifier("delayExchange") Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();
    }

}
@Slf4j
@RestController
@RequestMapping("/delay")
public class DelayController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void delayQueue() {
        for(int i = 0; i < 5; i++) {
            // 随机生成延迟时间
            Random random = new Random();
            int time = random.nextInt(20);
            // 消息处理器,设置延迟时间
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setDelayLong((long) (time * 1000)); // 设置延迟时间
                    return message;
                }
            };
            // 发送消息到延迟队列
            this.rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "hello 延迟队列 " + i, messagePostProcessor);
            log.info("发送延迟队列第" + i + "消息成功,延迟时间为:" + time);
        }
    }

}
@Slf4j
@Configuration
public class DelayListener {

    @RabbitListener(queues = Constants.DELAY_QUEUE)
    public void delayListener(String msg) {
        log.info("延迟队列监听器,接收到的消息:{}", msg);
    }

}

本质上,延迟插件就是让消息停留在交换机中,等到延迟时间结束之后,再发送到对应的队列中去。 

两者对比

使用TTL + 死信队列的好处是不需要额外安装插件。缺点是受消息的延迟时间影响,同一个队列中的消息必须延迟时间相同。

使用延迟队列插件的好处是不受延迟时间影响,同一队列中的所有消息延迟时间可以不同,额外的插件使得延迟队列的实现比较容易。缺点是需要依赖特定的插件,并且插件的版本必须和对应的RabbitMQ相对应。


http://www.niftyadmin.cn/n/5667985.html

相关文章

macos清理垃圾桶时提示 “操作无法完成,因为该项目正在使用中” 解决方法 , 强制清理mac废纸篓 方法

在macos中&#xff0c;删除文件后&#xff0c; 在清理垃圾桶时提示 “操作无法完成&#xff0c;因为该项目正在使用中” 出现这个提示&#xff0c;在大多数的情况下是因为数据问题导致&#xff0c;需要通过磁盘管理工具进行修复&#xff0c;修复后才可彻底的清理垃圾桶。 另外…

【目标检测】labelimg图像标注软件的使用流程

一、labelimg检测图片标注 1、下载labelimg.exe 链接&#xff1a;https://pan.baidu.com/s/1yk8ff56Xu40-ZLBghEQ5nw 提取码&#xff1a;vj8f 下载的文件是编译好的&#xff0c;可执行的labelImg.exe文件。直接将文件放在windows环境下&#xff0c;双击可执行。&#xff08;如果…

Redis-Redis的五种数据结构及使用场景

Redis的数据结构有&#xff1a; String(字符串)&#xff1a;可以用来做最简单的数据缓存&#xff0c;可以缓存某个简单的字符串&#xff0c;也可以缓存某个json格式的字符串&#xff0c;Redis分布式锁的实现就利用了这种数据结构&#xff0c;还包括可以实现计数器、Session共享…

边缘计算网关:连接中心计算与边缘设备的重要桥梁-天拓四方

一、边缘计算网关&#xff1a;重新定义信息高速公路的“路标” 边缘计算网关&#xff0c;作为边缘计算生态系统中的核心组件&#xff0c;不仅承载着数据传输的功能&#xff0c;更是智能信息处理的关键节点。它通过分布式计算架构&#xff0c;将数据处理任务前置到网络边缘&…

大数据时代的等保测评:数据安全与隐私保护

在大数据时代&#xff0c;等保测评&#xff08;信息安全等级保护测评&#xff09;对于数据安全与隐私保护具有至关重要的意义。随着大数据技术的飞速发展&#xff0c;数据已成为企业最宝贵的资产之一&#xff0c;但同时也带来了前所未有的安全挑战。以下是对大数据时代等保测评…

YOLOv8改进系列,YOLOv8替换主干网络为PP-HGNetV2(百度飞桨视觉团队自研,助力涨点)

摘要 PP-HGNetV2(High Performance GPU Network V2) 是百度飞桨视觉团队自研的 PP-HGNet 的下一代版本,其在 PP-HGNet 的基础上,做了进一步优化和改进,最终在 NVIDIA GPU 设备上,将 “Accuracy-Latency Balance” 做到了极致,精度大幅超过了其他同样推理速度的模型。其在…

OpenHarmony(鸿蒙南向开发)——小型系统内核(LiteOS-A)【扩展组件】上

往期知识点记录&#xff1a; 鸿蒙&#xff08;HarmonyOS&#xff09;应用层开发&#xff08;北向&#xff09;知识点汇总 鸿蒙&#xff08;OpenHarmony&#xff09;南向开发保姆级知识点汇总~ 子系统开发内核 轻量系统内核&#xff08;LiteOS-M&#xff09; 轻量系统内核&#…

如何理解变量提升和函数提升

在 JavaScript 中&#xff0c;变量提升和函数提升是指在代码执行之前&#xff0c;变量和函数声明会被提升到其所在作用域的顶部。这意味着你可以在声明之前使用它们&#xff0c;但它们的行为有所不同。 变量提升 变量提升是指变量声明&#xff08;使用 var&#xff09;会被提…