锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. JAVA
  4. Spring boot封装rocket mq 教程

Spring boot封装rocket mq 教程

0
  • JAVA
  • 发布于 2024-08-14
  • 0 次阅读
黄健
黄健

本文由 简悦 SimpRead 转码, 原文地址 blog.csdn.net

1、rocket mq 版本

      5.1.3

2、pom 引入 rocket mq 依赖

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.4</version>
        </dependency>

3、发送 MQ 消息工具类

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
 
@Slf4j
public class MqSendUtil {
 
    @SneakyThrows
    public static MessageId sendMq(String topic, String tag, String body, String... keys) {
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoint = "127.0.0.1:9080";
        // 消息发送的目标Topic名称,需要提前创建。
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        try (Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build()) {
            // 普通消息发送。
            Message message = provider.newMessageBuilder()
                    .setTopic(topic)
                    // 设置消息索引键,可根据关键字精确查找某条消息。
                    .setKeys(keys)
                    // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                    .setTag(tag)
                    // 消息体。
                    .setBody(body.getBytes())
                    .build();
            // 发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
            return sendReceipt.getMessageId();
        } catch (ClientException e) {
            log.error("Failed to send message", e);
            throw e;
        }
    }
 
    @SneakyThrows
    public static MessageId sendMqNoTag(String topic, String body, String... keys) {
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoint = "127.0.0.1:9080";
        // 消息发送的目标Topic名称,需要提前创建。
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        try (Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build()) {
            // 普通消息发送。
            Message message = provider.newMessageBuilder()
                    .setTopic(topic)
                    // 设置消息索引键,可根据关键字精确查找某条消息。
                    .setKeys(keys)
                    // 消息体。
                    .setBody(body.getBytes())
                    .build();
            // 发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            return sendReceipt.getMessageId();
        } catch (ClientException e) {
            log.error("Failed to send message", e);
            throw e;
        }
    }
 
 
}

4、发送 MQ 消息测试代码

import cn.hutool.core.util.IdUtil;
import org.recipe.draw.common.util.MqSendUtil;
 
public class MqSendTest {
    public static void test1() {
        MqSendUtil.sendMq("demo", "tag", "哈哈哈哈tag", IdUtil.getSnowflakeNextIdStr());
    }
 
    public static void main(String[] args) {
        test1();
    }
}

5、MessageContext 消息内容的封装

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
 
import java.util.Collection;
import java.util.Map;
 
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageContext {
 
    private String messageId;
    private String topic;
    private String body;
    private Map<String, String> properties;
    private Collection<String> keys;
    private Long deliveryTimestamp;
    private String bornHost;
    private Long bornTimestamp;
    private int deliveryAttempt;
 
 
}

6、AbstractMqConsumer 发送 mq 消息的抽象类

import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.boot.CommandLineRunner;
 
import java.nio.charset.StandardCharsets;
import java.util.Collections;
 
@Slf4j
public abstract class AbstractMqConsumer implements CommandLineRunner {
 
    public abstract String topic();
 
    public abstract String consumerGroup();
 
    public abstract String tag();
 
    public abstract void process(MessageContext messageContext);
 
 
    @Override
    public void run(String... args) throws Exception {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "127.0.0.1:9080";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = StrUtil.isEmpty(tag()) ? "*" : tag();
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 为消费者指定所属的消费者分组,Group需要提前创建。
        String consumerGroup = consumerGroup();
        // 指定需要订阅哪个目标Topic,Topic需要提前创建。
        String topic = topic();
        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者分组。
                .setConsumerGroup(consumerGroup)
                // 设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 设置消费监听器。
                .setMessageListener(messageView -> {
                    // 处理消息并返回消费结果。
                    MessageContext context = toMessageContext(messageView);
//                    log.info("收到mq消息主体内容:{}",context);
                    try {
                        process(context);
                    } catch (Exception e) {
                        log.error("处理mq消息出现异常,消息已自动丢弃,不会再投入队列:", e);
                    }
                    return ConsumeResult.SUCCESS;
                })
                .build();
        log.info("消费者初始化完成,topic:{},tag:{},consumerGroup:{}", topic, tag, consumerGroup);
    }
 
    private MessageContext toMessageContext(MessageView messageView) {
        Long deliveryTimestamp = messageView.getDeliveryTimestamp().isPresent() ? messageView.getDeliveryTimestamp().get() : null;
        return MessageContext.builder()
                .messageId(messageView.getMessageId().toString())
                .topic(messageView.getTopic())
                .body(StandardCharsets.UTF_8.decode(messageView.getBody()).toString())
                .properties(messageView.getProperties())
                .keys(messageView.getKeys())
                .deliveryTimestamp(deliveryTimestamp)
                .bornHost(messageView.getBornHost())
                .deliveryAttempt(messageView.getDeliveryAttempt())
                .build();
    }
 
}

7、具体的消费类

topic 指定消费者订阅的话题,comsumerGroup 指明该消费者属于哪一个消费者分组,tag 表明是否要获取指定标签的消息,process 代表具体的业务处理逻辑,具体消息的内容可以 MessageContext 类里面获取

import lombok.extern.slf4j.Slf4j;
import org.recipe.draw.common.mqcomsumer.abstracts.AbstractMqConsumer;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.stereotype.Component;
 
@Component
@Slf4j
public class DemoConsumer extends AbstractMqConsumer {
 
    @Override
    public String topic() {
        return "demo";
    }
 
    @Override
    public String consumerGroup() {
        return "demo";
    }
 
    @Override
    public String tag() {
        return null;
    }
 
    @Override
    public void process(MessageContext messageContext) {
        log.info("收到消息:{}", messageContext);
    }
}
标签: #JAVA 991 #软件开发 1171
相关文章

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

SpringBoot整合异步任务执行 2024-10-08 11:24

同步任务: 同步任务是在单线程中按顺序执行,每次只有一个任务在执行,不会引发线程安全和数据一致性等 并发问题 同步任务需要等待任务执行完成后才能执行下一个任务,无法同时处理多个任务,响应慢,影响用 户体验 异步任务: 异步任务是在多线程中同时执行,多个任务可以并发执行,同时处理多个请求,响应快,资源

springboot kafka多数据源,通过配置动态加载发送者和消费者 2024-10-08 11:24

前言 最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是如果需要不修改代码扩展呢,因为kafka本身不处理额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。 准备test kafka本身非常容易上手,如果我们需要单元测试,引入ja

SpringBoot 集成 Redis 2024-10-08 11:24

一:SpringBoot 集成 Redis ①Redis是一个 NoSQL(not only)数据库, 常作用缓存 Cache 使用。 ②Redis是一个中间件、是一个独立的服务器;常用的数据类型: string , hash ,set ,zset , list ③通过Redis客户端可以使用多种语

SpringBoot整合QQ邮箱 2024-10-08 11:24

SpringBoot可以通过导入依赖的方式集成多种技术,这当然少不了我们常用的邮箱,现在本章演示SpringBoot整合QQ邮箱发送邮件…. 下面按步骤进行: 1.获取QQ邮箱授权码 1.1 登录QQ邮箱 1.2 开启SMTP服务 找到下图中的SMTP服务区域,如果当前账号未开启的话自己手动开启。

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.