锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. JAVA
  4. SpringBoot 集成 RabbitMQ

SpringBoot 集成 RabbitMQ

0
  • JAVA
  • 发布于 2024-08-16
  • 13 次阅读
黄健
黄健

本文由 简悦 SimpRead 转码, 原文地址 blog.csdn.net

概要

SpringBoot 集成 RabbitMQ

整体架构流程

1、安装并运行 RabbitMQ 服务器
2、创建 Spring Boot 项目并添加依赖
3、配置 RabbitMQ
4、生产者、消费者、队列、交换机和路由键

技术细节

Docker 安装 RabbitMQ

docker pull rabbitmq:3-management

运行 RabbitMQ 容器

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Spring Boot 项目添加依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

配置 RabbitMQ

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual #将消息确认模式设置为手动

配置队列、交换机和路由键

@Configuration
public class RabbitMQConfig {
	/**
     * 定义主题交换机
     * 根据消息的路由键和绑定的路由模式进行消息路由。
     */
    @Bean
    public Exchange topicExchange() {
        return new TopicExchange("topic.exchange", true, false);
    }
    /**
     * 绑定主题交换机
     */
	@Bean
	public Binding queueBindingTopic() {
	    return BindingBuilder.bind("your-queue").to(topicExchange()).with("topic.#");
	}
    /**
     * 定义扇形交换机
     * 会将收到的所有消息广播到与其绑定的所有队列。
	 * 不管消息的路由键是什么,所有绑定的队列都会收到相同的消息
     */
    @Bean
	public FanoutExchange fanoutExchange() {
	    return new FanoutExchange("fanout.exchange");
	}
	/**
     * 绑定扇形交换机
     */
	@Bean
	public Binding queueBindingFanout() {
	    return BindingBuilder.bind("your-queue").to(fanoutExchange());
	}
    /**
     * 定义直连交换机
     * 直连交换机 (Direct Exchange) 是一种 RabbitMQ 交换机类型,它根据消息的路由键路由消息。
	 * 消息会附带特定的路由键发送到交换机。
	 * 然后交换机尝试找到一个与其绑定的队列,并具有匹配的路由键。
	 * 如果找到匹配的队列,则消息将被投递到该队列。
     */
    @Bean
	public DirectExchange directExchange() {
	    return new DirectExchange("direct.exchange");
	}
	/**
     * 绑定直连交换机
     */
	@Bean
	public Binding queueBindingDirect() {
	    return BindingBuilder.bind("your-queue").to(directExchange()).with("routing.key");
	}
    /**
    * 优先级队列
    */
    @Bean
	public Queue priorityQueue() {
	    Map<String, Object> args = new HashMap<>();
	    args.put("x-max-priority", 10);
	    return new Queue("priority.queue", true, false, false, args);
	}
	/**
	* 持久化队列
	*/
	@Bean
	public Queue durableQueue() {
	    return new Queue("durable.queue", true);
	}
	/**
	* 消息 TTL(Time to Live)
	*/
	@Bean
	public Queue ttlQueue() {
	    Map<String, Object> args = new HashMap<>();
	    args.put("x-message-ttl", 60000); // 60秒
	    return new Queue("ttl.queue", true, false, false, args);
	}
	/**
     * 延时队列
     * 需要安装RabbitMQ的延时消息插件(RabbitMQ Delayed Message Plugin)
     */
    @Bean
    public Queue delayQueue() {
        //延时队列的消息过期了,会自动触发消息的转发,
        //根据routingKey发送到指定的exchange中,exchange路由到死信队列
        Map<String, Object> args = new HashMap<>();
        //表示队列的最大长度为 1000 条消息。超过此限制的消息将会被丢弃。
        args.put("x-max-length", 1000);
        args.put("x-dead-letter-exchange", topicExchange());
        // 死信路由Key
        args.put("x-dead-letter-routing-key", "dlx.close"); 
        // 单位:毫秒,1分钟测试使用
        args.put("x-message-ttl", 60000); 
        return new Queue("delay.queue", true, false, false, args);
    }
    /**
     * 延时队列绑定交换机
     */
    @Bean
    public Binding delayQueueBinding() {
    	return BindingBuilder.bind(delayQueue()).to(topicExchange()).with("delay.create");
    }
	/**
	* 死信队列
	*/
	@Bean
	public Queue dlxQueue() {
	    Map<String, Object> args = new HashMap<>();
	    args.put("x-dead-letter-exchange", "dlx.exchange");
	    return new Queue("dlx.queue", true, false, false, args);
	}
	/**
     * 死信队列绑定交换机
     */
    @Bean
    public Binding dlxQueueBinding() {
    	return BindingBuilder.bind(dlxQueue()).to(directExchange()).with("dlx.close").noargs();;
    }
}

生产者

@Service
public class RabbitMQProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Value("${spring.rabbitmq.exchange}")
    private String exchange;
    @Value("${spring.rabbitmq.routingkey}")
    private String routingKey;
    @Value("${spring.rabbitmq.queue}")
    private String queue;
    
    public void send(String message) {
        rabbitTemplate.convertAndSend(queue, message, msg -> {
	       	msg.getMessageProperties()
	       	//当将消息的投递模式设置为 PERSISTENT 时,RabbitMQ 会将消息持久化存储到磁盘上。
	       	//即使 RabbitMQ 服务重启,消息也不会丢失。
        	//相反,如果投递模式为 TRANSIENT (默认),则消息仅保存在内存中,RabbitMQ 服务重启后将会丢失。
	       	.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
	       	//RabbitMQ 将根据消息的优先级进行投递,优先级高的消息将优先被消费者消费。
	       	.setPriority(priority);
            return msg;
        });
    }
}

消费者

@Service
public class RabbitMQConsumer {
    @RabbitListener(queues = "${spring.rabbitmq.queue}")
    public void receivedMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            // 处理消息
            System.out.println("Received message: " + message);
            // 手动确认消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 处理异常,拒绝消息
            channel.basicNack(deliveryTag, false, true);
            //或者重新入队
            channel.basicReject(deliveryTag, true);
        }
    }
}

小结

生产环境中,可以增加消费者并发数,提升消息处理能力。
可以将消息进行压缩,减少网络开销
配置合理的消息 TTL,避免消息积压。

标签: #RabbitMQ 10 #软件开发 1171 #JAVA 991
相关文章

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

SpringBoot整合异步任务执行 2024-10-08 11:24

同步任务: 同步任务是在单线程中按顺序执行,每次只有一个任务在执行,不会引发线程安全和数据一致性等 并发问题 同步任务需要等待任务执行完成后才能执行下一个任务,无法同时处理多个任务,响应慢,影响用 户体验 异步任务: 异步任务是在多线程中同时执行,多个任务可以并发执行,同时处理多个请求,响应快,资源

springboot kafka多数据源,通过配置动态加载发送者和消费者 2024-10-08 11:24

前言 最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是如果需要不修改代码扩展呢,因为kafka本身不处理额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。 准备test kafka本身非常容易上手,如果我们需要单元测试,引入ja

SpringBoot 集成 Redis 2024-10-08 11:24

一:SpringBoot 集成 Redis ①Redis是一个 NoSQL(not only)数据库, 常作用缓存 Cache 使用。 ②Redis是一个中间件、是一个独立的服务器;常用的数据类型: string , hash ,set ,zset , list ③通过Redis客户端可以使用多种语

SpringBoot整合QQ邮箱 2024-10-08 11:24

SpringBoot可以通过导入依赖的方式集成多种技术,这当然少不了我们常用的邮箱,现在本章演示SpringBoot整合QQ邮箱发送邮件…. 下面按步骤进行: 1.获取QQ邮箱授权码 1.1 登录QQ邮箱 1.2 开启SMTP服务 找到下图中的SMTP服务区域,如果当前账号未开启的话自己手动开启。

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.