锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. RocketMQ基础使用

RocketMQ基础使用

0
  • 软件开发
  • 发布于 2024-08-19
  • 1 次阅读
黄健
黄健

环境准备

1.软件准备:

RocketMQ版本:4.5.1

2.环境要求

  • JDK 1.8.0
  • Linux64位系统(CentOS Linux release 7.9.2009)
  • 源码安装需要安装Maven 3.2.x
  • 4G+ free 磁盘空间

3.安装及启动
下载rocketmq

wget https://archive.apache.org/dist/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip

4.解压文件

unzip rocketmq-all-4.5.1-bin-release.zip
mv rocketmq-all-4.5.1-bin-release rocketmq

5.修改/etc/profile添加home目录

vim /etc/profile

#在最下面添加下面两句, 目录需要根据自己的实际目录进行修改
export ROCKET_HOME=/root/rocketmq/rocketmq
export PATH=$PATH:${ROCKET_HOME}/bin

source /etc/profile

6.启动NameServer

# 1.启动NameServer
mqnamesrv
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log

7.启动Broker

#启动前需要修改下broker的启动脚本,调整使用内存的大小否则可能会启动失败
vim bin/runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m"

# 1.启动Broker
mqbroker -n localhost:9876
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log

RocketMQ环境测试

1.接收消息

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

2.发送消息

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

3.关闭RocketMQ

# 1.关闭NameServer
mqshutdown namesrv
# 2.关闭Broker
mqshutdown broker

JAVA连接使用

创建maven工程导入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

同步模式生产者+pull模式消费者

创建同步模式生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * @program: rocketmq-study
 * @Description
 * @author Elvis
 * @date 2021-08-28 15:47    
 */
public class DemoProducer {
   
    public static void main(String[] args) throws Exception {
   
        // 在实例化生产者的同时,指定了生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("demo_producer_grp_01");

        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.31.10:9876");

        // 对生产者进行初始化,然后就可以使用了
        producer.start();

        // 创建消息,第一个参数是主题名称,第二个参数是消息内容
        Message message = new Message("topic_demo_01", "hello world demo 01".getBytes());

        // 发送消息
        SendResult result = producer.send(message);
        System.out.println(result);

        producer.shutdown();
    }
}

创建pull模式消费者

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;
import java.util.Set;

/**
 * @program: rocketmq-study
 * @Description
 * @author Elvis
 * @date 2021-08-28 16:02    
 */
public class DemoPullConsumer {
   
    public static void main(String[] args) throws Exception {
   
        // 拉取消息的消费者实例化,同时指定消费组名称
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("demo_consumer_grp_01");

        // 设置nameserver的地址
        consumer.setNamesrvAddr("192.168.31.10:9876");

        // 对消费者进行初始化,然后就可以使用了
        consumer.start();

        // 获取指定主题的消息队列集合
        Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues("topic_demo_01");

        for (MessageQueue queue : queues) {
   
            // 第一个参数是MessageQueue对象,代表了当前主题的一个消息队列
            // 第二个参数是一个表达式,对接收的消息按照tag进行过滤
            // 支持"tag1 || tag2 || tag3"或者 "*"类型的写法;null或者"*"表示不对消息进行tag过滤
            // 第三个参数是消息的偏移量,从这里开始消费
            // 第四个参数表示每次最多拉取多少条消息
            PullResult result = consumer.pull(queue, "*", 0, 10);
            // 打印消息队列的信息
            System.out.println("message******queue******" + queue);
            List<MessageExt> foundList = result.getMsgFoundList();
            if (foundList == null) continue;
            for (MessageExt messageExt: foundList) {
   
                System.out.println(messageExt);
                System.out.println(new String(messageExt.getBody(), "utf-8"));
            }
        }

        consumer.shutdown();
    }
}

异步模式生产者+push模式消费者

创建异步模式生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * @program: rocketmq-study
 * @Description
 * @author Elvis
 * @date 2021-08-28 16:26    
 */
public class DemoProducerAsync {
   
    public static void main(String[] args) throws Exception {
   
        DefaultMQProducer producer = new DefaultMQProducer("demo_producer_grp_01");

        producer.setNamesrvAddr("192.168.31.10:9876");

        producer.start();


        for (int i = 0; i < 100; i++) {
   
            // 创建消息,第一个参数是主题名称,第二个参数是消息内容
            Message message = new Message("topic_demo_02", ("hello world demo " + i).getBytes());

            producer.send(message, new SendCallback() {
   
                @Override
                public void onSuccess(SendResult sendResult) {
   
                    System.out.println("send success " + sendResult);
                }

                @Override
                public void onException(Throwable e) {
   
                    System.out.println("send failed " + e.getMessage());
                }
            });
        }

        // 由于是异步发送消息,上面循环结束之后,消息可能还没收到broker的响应
        // 如果不sleep一会儿,就报错
        Thread.sleep(10_000);

        // 关闭生产者
        producer.shutdown();
    }
}

创建push模式消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @program: rocketmq-study
 * @Description
 * @author Elvis
 * @date 2021-08-28 16:30    
 */
public class DemoPushConsumer {
   
    public static void main(String[] args) throws  Exception {
   
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_grp_01");

        consumer.setNamesrvAddr("192.168.31.10:9876");

        consumer.subscribe("topic_demo_02", "*");

        consumer.setMessageListener(new MessageListenerConcurrently() {
   
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   
                MessageQueue queue = context.getMessageQueue();
                System.out.println(queue);

                for (MessageExt message : msgs) {
   
                    try {
   
                        System.out.println(new String(message.getBody(), "utf-8"));
                    } catch (UnsupportedEncodingException e) {
   
                        e.printStackTrace();
                    }

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 初始化消费者,之后开始消费消息
        consumer.start();

        // 此处只是示例,生产中除非运维关掉,否则不应停掉,长服务
//        Thread.sleep(30_000);
//        // 关闭消费者
//        consumer.shutdown();

    }
}

SpringBoot整合RocketMQ客户端

创建maven工程导入依赖

    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.2.9.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.2.9.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

生产者

producer创建yaml配置文件

rocketmq:
  name-server: 192.168.31.10:9876
  producer:
    group: springboot-produce

启动类

@SpringBootApplication
public class RunProducer {
   
    public static void main(String[] args) {
   
        SpringApplication.run(RunProducer.class, args);
    }
}

测试类

import com.elvis.RunProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @program: rocketmq-study
 * @Description
 * @author Elvis
 * @date 2021-08-28 17:01    
 */
@SpringBootTest(classes = {
   RunProducer.class})
public class RocketMQProducerTest {
   

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Test
    public void test01() {
   
        rocketMQTemplate.convertAndSend("topic_springboot_01", "hello springboot demo");
    }

    @Test
    public void test02() {
   
        for (int i = 0; i < 100; i++) {
   
            rocketMQTemplate.convertAndSend("topic_springboot_01", "hello springboot demo" + i);
        }
    }
}

消费者

yaml配置文件

rocketmq:
  name-server: 192.168.31.10:9876

启动类

@SpringBootApplication
public class RunConsumer {
   
    public static void main(String[] args) {
   
        SpringApplication.run(RunConsumer.class, args);
    }
}

创建监听器

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * @program: rocketmq-study
 * @Description
 * @author Elvis
 * @date 2021-08-28 17:27    
 */
@Component
@Slf4j
@RocketMQMessageListener(topic = "topic_springboot_01", consumerGroup = "consumer-springboot")
public class MyRocketListener implements RocketMQListener<String> {
   

    @Override
    public void onMessage(String message) {
   
        // 处理broker推送过来的消息
        log.info(message);
    }
}

先启动消费者程序,再启动生产者测试程序

原文链接: https://blog.csdn.net/Kiven_ch/article/details/119967050

标签: #RocketMQ 5 #软件开发 1171 #JAVA 991
相关文章

万字:支付“核心系统”详解 2024-11-02 15:33

专栏作者:隐墨星辰 \| 主编:陈天宇宙 这篇文章也尝试化繁为简,探寻支付系统的本质,讲清楚在线支付系统最核心的一些概念和设计理念。 虽然支付行业已经过了风头最劲的时光,但跨境支付仍然在蓬勃发展,每年依然有很多新人进入这个行业,这篇文章尝试为这些刚入行的新人提供一点帮助。 文章只介绍一些支付行业十几

资深支付架构师视角:实战从问题定义到代码落地的完整套路 2024-11-02 15:33

前言 今天从一个实际案例入手,介绍站在架构师的角度,如何识别并定义问题,提炼需求,技术方案选型,再到详细设计,最后利用AI的能力协助写出核心的代码,验证与调优。 解决问题存在一定的模式,也可以称之为框架,总结出自己的思考和解题框架,以后再碰到同类型的问题就可以如庖丁解牛一样容易。 很多年前,我写代码

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

设计模式第16讲——迭代器模式(Iterator) 2024-10-08 11:24

一、什么是迭代器模式 迭代器模式是一种行为型设计模式,它提供了一种统一的方式来访问集合对象中的元素,而不是暴露集合内部的表示方式。简单地说,就是将遍历集合的责任封装到一个单独的对象中,我们可以按照特定的方式访问集合中的元素。 二、角色组成 抽象迭代器(Iterator):定义了遍历聚合对象所需的方法

vue2路由和vue3路由区别及原理 2024-10-08 11:24

一、Vue2 与 Vue3 路由的区别 1. 创建路由实例方式的不同 Vue 2 中,通过 Vue.use() 注册路由插件,并通过 new VueRouter() 来创建路由实例。 import Vue from 'vue';import VueRouter from 'vue-router';i

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.