锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. JAVA
  4. RabbitMQ-如何保证消息不丢失

RabbitMQ-如何保证消息不丢失

0
  • JAVA
  • 发布于 2024-08-06
  • 0 次阅读
黄健
黄健

本文由 简悦 SimpRead 转码, 原文地址 blog.csdn.net

RabbitMQ 常用于 异步发送,mysql,redis,es 之间的数据同步 ,分布式事务,削峰填谷等.....

在微服务中,rabbitmq 是我们经常用到的消息中间件。它能够异步的在各个业务之中进行消息的接受和发送,那么如何保证 rabbitmq 的消息不丢失就显得尤为重要。

首先要分析问题,我们就要明确 rabbitmq 在什么时候可能会出现消息丢失的情况呢?

我们直接说结果

RabbitMQ 在每个阶段都有可能使消息发生丢失

我们在这里把他们简单归结为三个层面

层面一 :生产者发送消息没有到达交换机或者没有到达绑定的队列。

层面二:RabbitMQ 宕机可能导致的消息的丢失。

层面三:消费者宕机导致消息丢失。

层面一的解决方法常见的是

  1. 生产者确认机制

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 Mq 的过程中丢失,消息发送到 Mq 以后,会返回一个结果给发送者,表示消息的发送成功。

情况一:发送成功 生产者正常发送消息到队列之后会返回一个 publish-confirm ack 这个意思是告诉生产者已经接收到消息了。

情况二:发送失败 这里的发送失败有两种,一种是生产者发送到交换机失败 此时返回 publish-confirm nack  。第二种是生产者发送到队列失败 返回 publish-return ack。

开启生产者确认机制的代码如下 ,在生产者的配置文件中加入以下配置

spring:
  rabbitmq:
    publisher-confirm-type: correlated #开启生产者确认机制
    publisher-returns: true

这里的

package com.itheima.publisher.com.it.heima.config;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
 
/**
 * @Auther: QuJingChuan
 * @Date: 2024/1/13 10:34
 * @Description:
 */
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //配置回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.debug("收到消息return的callback,  {},{},{},{},{}",
                        returnedMessage.getExchange(),
                        returnedMessage.getRoutingKey(),
                        returnedMessage.getMessage(),
                        returnedMessage.getReplyCode(),
                        returnedMessage.getReplyText());
            }
        });
    }
}

第一种是 none:代表关闭 confirm 机制

第二种是 simple:表示同步阻塞并等待 mq 的回执消息,即发送完消息后不能干其他的事情,只能等待 mq 的回执,很显然这样效率很低。

第三种是 correlated:MQ 异步回调方式返回回执消息,即生产者发送完消息后可以干其他的事情,直到接收到 mq 的回执。很明显这种效率要优于第二种。

配置 return callback 的代码如下,每个 RabbitTemplate 只能配置一个 代码如下

@Test
    void testConfirmCallback() throws InterruptedException {
        //创建cd 参数为每次发送消息的id
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //添加confirmCallBack
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                //这种情况一般是运行出现bug,一般不会发生。
                log.error("消息回调失败",ex);
            }
 
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                log.debug("收到confirm callback 回执");
                if (result.isAck()){
                    //消息发送成功
                    log.debug("消息发送成功收到ack");
                }else {
                    //消息发送失败
                    log.debug("消息发送失败收到nack,原因:{}",result.getReason());
                    //TODO 重发消息等业务
                }
            }
        });
 
        rabbitTemplate.convertAndSend("amqp.test","amqptest","hello qjc",correlationData);
 
        Thread.sleep(2000);
    }

Confirm Callback 需要每次发消息的时候都要配置(要制定发消息的 id 方便回执的时候直到是谁发的消息)这里写一个测试类方便大家看。

@Bean
    public DirectExchange simpleExchange(){
        //分别是三个参数 交换机名称 是否持久化 当没有队列绑定时是否自动删除
        return new DirectExchange("qjc.exchange",true,false);
    }

那么我们如何解决这个问题呢
方案一:重发消息 

方案二:记录日志

方案三:保存到数据库中定时发送,发送成功后删除表中的数据。

方案四:交给人工处理。

~ 生产者确认机制需要额外的网络和系统的资源开销,尽量不要使用。

~ 如果业务需要,那么无需开启 publisher-return 机制,因为一般路由失败都是自己业务的原因。

~ 对于 nack 消息可以有限次数的重试,依然失败则记录异常消息。

层面二的解决方法常见的是

  1. 消息持久化

由于 mq 是基于内存存储消息的,那么在 mq 服务宕机等一些情况下可能导致消息的丢失。同时内存空间有限,当消费者出现故障或者处理过慢,会导致消息积压,mq 会对消息做迁移(page out 写入磁盘)从而引发 mq 阻塞。我们将消息存储在磁盘上就避免了这个问题。

一 :持久化交换机。

这里要选择 Durable, 因为 Transient 是临时交换机,当 mq 宕机后会消失。

或

代码展示

@Bean
    public Queue simpleQueue(){
        //springamqp在使用QueueBuilder来创建队列的时候,默认就是持久化的
        return QueueBuilder.durable("qjc.queue").build();
    }

二 :持久化队列。

这个与交换机类似,在此不做赘述。

或

代码展示

Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();

三 :持久化消息。

这里选择 delivery mode 选择 2 ,1 是不持久的。

或

代码展示

@RabbitListener(queuesToDeclare = @Queue(
            name = "lazy.queue",
            durable = "true",
            arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listenLazyQueue(String msg){
        log.debug("接收到lazyqueue的消息" + msg);
    }
如果不选择持久化队列,交换机,消息的话我们还有另一种方案

Lazy Queue(惰性队列)

惰性队列的特征如下

~ 接受到消息的时候直接存入磁盘而非内存(内存中只保留最近的消息)

~ 消费者需要消息的时候才会从磁盘中取出数据加载到内存

~ 支持数百万条的消息存储

在 mq3.12 版本后,所有的队列都是 Lazy Queue 模式,无法更改。

如果各位小伙伴的版本低于 3.12 那我这里提供了两种方式创建惰性队列

或

或用注解声明

@RabbitListener(queuesToDeclare = @Queue(
            name = "lazy.queue",
            durable = "true",
            arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listenLazyQueue(String msg){
        log.debug("接收到lazyqueue的消息" + msg);
    }
  1. 消费者确认机制

RabbitMQ 支持消费者确认机制,即:当消费者处理消息后可以向 mq 发送 ack 回执,mq 收到消息后会在队列中删除该消息。

SpringAMQP 已经实现了消息确认的功能,并且允许我们通过配置文件选择 ack 的处理方式,有三种方式。

  • none: 不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用  
  • manual: 手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject,存在业务入侵,但更灵活  
  • auto: 自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack.  
    当业务出现异常时,根据异常判断返回不同结果:  
  • 如果是业务异常,会自动返回 nack  
  • 如果是消息处理或校验异常,自动返回 reject

注意我们需要再消费者的配置文件中加入参数

这就是 mq 保证消息不丢失的一些方式和解决方案。

标签: #JAVA 991
相关文章

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

SpringBoot整合异步任务执行 2024-10-08 11:24

同步任务: 同步任务是在单线程中按顺序执行,每次只有一个任务在执行,不会引发线程安全和数据一致性等 并发问题 同步任务需要等待任务执行完成后才能执行下一个任务,无法同时处理多个任务,响应慢,影响用 户体验 异步任务: 异步任务是在多线程中同时执行,多个任务可以并发执行,同时处理多个请求,响应快,资源

springboot kafka多数据源,通过配置动态加载发送者和消费者 2024-10-08 11:24

前言 最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是如果需要不修改代码扩展呢,因为kafka本身不处理额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。 准备test kafka本身非常容易上手,如果我们需要单元测试,引入ja

SpringBoot 集成 Redis 2024-10-08 11:24

一:SpringBoot 集成 Redis ①Redis是一个 NoSQL(not only)数据库, 常作用缓存 Cache 使用。 ②Redis是一个中间件、是一个独立的服务器;常用的数据类型: string , hash ,set ,zset , list ③通过Redis客户端可以使用多种语

SpringBoot整合QQ邮箱 2024-10-08 11:24

SpringBoot可以通过导入依赖的方式集成多种技术,这当然少不了我们常用的邮箱,现在本章演示SpringBoot整合QQ邮箱发送邮件…. 下面按步骤进行: 1.获取QQ邮箱授权码 1.1 登录QQ邮箱 1.2 开启SMTP服务 找到下图中的SMTP服务区域,如果当前账号未开启的话自己手动开启。

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.