锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. RabbitMQ工作流程和工作模式详解

RabbitMQ工作流程和工作模式详解

0
  • 软件开发
  • 发布于 2024-08-19
  • 1 次阅读
黄健
黄健

RabbitMQ工作流程

生产者发送消息的流程

  1. 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)
  2. 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
  3. 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
  4. 生产者通过bindingKey (绑定Key)将交换器和队列绑定( binding )起来
  5. 生产者发送消息至RabbitMQ Broker,其中包含routingKey (路由键)、交换器等信息
  6. 相应的交换器根据接收到的routingKey 查找相匹配的队列。
  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中。
  8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  9. 关闭信道。
  10. 关闭连接。

消费者接收消息的过程

  1. 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
  2. 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作
  3. 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
  4. 消费者确认( ack) 接收到的消息。
  5. RabbitMQ 从队列中删除相应己经被确认的消息。
  6. 关闭信道。
  7. 关闭连接。

简单案例

  Hello World一对一的简单模式。生产者直接发送消息给RabbitMQ,另一端消费。未定义和指定Exchange的情况下,使用的是AMQP default这个内置的Exchange。
引入maven依赖

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>

HelloProducer

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Rabbitmq是一个消息broker:接收消息,传递给下游应用
 *
 * 术语:
 * Producing就是指发送消息,发送消息的程序是Producer
 * Queue指的是RabbitMQ内部的一个组件,消息存储于queue中。queue使用主机的内存和磁盘存
 储,收到内存和磁盘空间的限制
 * 可以想象为一个大的消息缓冲。很多Producer可以向同一个queue发送消息,很多消费者
 可以从同一个queue消费消息。
 * Consuming就是接收消息。一个等待消费消息的应用程序称为Consumer
 *
 * 生产者、消费者、队列不必在同一台主机,一般都是在不同的主机上的应用。一个应用可以同时是
 生产者和消费者。
 *
 */

/**
 * @program: rabbitmq_study
 * @Description
 * @author Elvis
 * @date 2021-08-22 16:22    
 */
public class HelloProducer {
   
    public static void main(String[] args) throws IOException, TimeoutException {
   
        ConnectionFactory factory = new ConnectionFactory();

        // 设置主机名 hostname
        factory.setHost("192.168.31.10");
        // amqp的端口号
        factory.setPort(5672);
        // 用户名
        factory.setUsername("root");
        // 密码
        factory.setPassword("123456");
        // 设置虚拟主机名称  /在url中的转义字符 %2f
        factory.setVirtualHost("/");

        // 建立TCP连接
        Connection connection = factory.newConnection();

        // 获取通道
        Channel channel = connection.createChannel();

        // 声明消息队列   消息队列名称
        // 是否是持久化的
        // 是否是排他的
        // 是否是自动删除的
        // 消息队列的属性信息。使用默认值;
        channel.queueDeclare("hello", false, false, false, null);
        channel.basicPublish("", "hello", null, "hello world".getBytes());

        channel.close();
        connection.close();
    }
}

HelloConsumer push模式

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;

/**
 * @program: rabbitmq_study
 * @Description
 * @author Elvis
 * @date 2021-08-22 16:42    
 */
public class HelloConsumer {
   
    public static void main(String[] args) throws Exception {
   
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("hello", false, false, false, null);

        channel.basicConsume("hello", (consumerTag, message)->{
   
            System.out.println((new String(message.getBody())));
        }, (consumerTag)->{
   });

        // channel.close();;
        // connection.close();
    }
}

HelloGetConsumer get模式

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

/**
 * @program: rabbitmq_study
 * @Description
 * @author Elvis
 * @date 2021-08-22 16:42    
 */
public class HelloGetConsumer {
   
    public static void main(String[] args) throws Exception {
   
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("hello", false, false, false, null);

        GetResponse getResponse = channel.basicGet("hello", true);

        System.out.println(new String(getResponse.getBody()));

        channel.close();;
        connection.close();
    }
}

先启动消费者,再启动生产者

Connection 和Channel关系

  生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。


为什么不直接使用TCP连接,而是使用信道?

RabbitMQ 采用类似NIO的做法,复用TCP 连接,减少性能开销,便于管理。

当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。

当信道本身的流量很大时,一个Connection 就会产生性能瓶颈,流量被限制。需要建立多个Connection ,分摊信道。具体的调优看业务需要。

信道在AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面进行的。

channel.exchangeDeclare
channel.queueDeclare
channel.basicPublish
channel.basicConsume
// ...

RabbitMQ 相关的API与AMQP紧密相连,比如channel.basicPublish 对应AMQP 的Basic.Publish命令。

RabbitMQ工作模式详解

官网地址:https://www.rabbitmq.com/getstarted.htm

Work Queue

生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果。

Producer

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @program: rabbitmq_study
 * @Description
 * @author Elvis
 * @date 2021-08-22 17:11    
 */
public class Producer {
   
    public static void main(String[] args) throws Exception{
   
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个交换器
        channel.exchangeDeclare("ex.wq", BuiltinExchangeType.DIRECT, true, false, null);

        // 声明消息队列
        channel.queueDeclare("queue.wq", true, false, false, null);

        // 将交换器绑定到消息队列,同时指定绑定键(binding-key)
        channel.queueBind("queue.wq", "ex.wq", "key.wq");

        for (int i = 0; i < 15; i++) {
   
            channel.basicPublish("ex.wq", "key.wq", null, ("工作队列:" + i).getBytes("utf-8"));
        }

        channel.close();
        connection.close();
    }
}

Consumer

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @program: rabbitmq_study
 * @Description
 * @author Elvis
 * @date 2021-08-22 18:19    
 */
public class Consumer {
   
    public static void main(String[] args)  throws Exception{
   
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明消息队列
        channel.queueDeclare("queue.wq", true, false, false, null);

        channel.basicConsume("queue.wq", (consumerTag, message)->{
   
            System.out.println(new String(message.getBody()));
        }, (consumerTag)->{
   });

    }
}

先启动consumer再启动producer

发布订阅模式

  使用fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息。

  消息广播给所有订阅该消息的消费者。

  在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。

  生产者将消息发送给交换器。交换器非常简单,从生产者接收消息,将消息推送给消息队列。交换器必须清楚地知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器类型。


交换器的类型有: direct 、topic 、headers 和fanout 四种类型。发布订阅使用fanout。创建交换器,名字叫logs :

channel.exchangeDeclare("1 logs", "fanout");

  fanout 交换器很简单,从名字就可以看出来(用风扇吹出去),将所有收到的消息发送给它知道的所有的队列。

rabbitmqctl list_exchanges

列出RabbitMQ的交换器,包括了amq.* 的和默认的(未命名)的交换器。

未命名交换器

  在前面的那里中我们没有指定交换器,但是依然可以向队列发送消息。这是因为我们使用了默认的交换器。

channel.basicPublish("", "hello", null, message.1 getBytes());

  第一个参数就是交换器名称,为空字符串。直接使用routingKey向队列发送消息,如果该routingKey指定的队列存在的话。

  现在,向指定的交换器发布消息:

channel.basicPublish("logs", "", null, message.getBytes());

临时队列

  前面我们使用队列的名称,生产者和消费者都是用该名称来发送和接收该队列中的消息。

  首先,我们无论何时连接RabbitMQ的时候,都需要一个新的,空的队列。我们可以使用随机的名字创建队列,也可以让服务器帮我们生成随机的消息队列名字。

  其次,一旦我们断开到消费者的连接,该队列应该自动删除。

String queueName = channel.queueDeclare().getQueue();

producer

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @program: rabbitmq_study
 * @Description
 * @author Elvis
 * @date 2021-08-22 19:02    
 */
public class Producer {
   
    public static void main(String[] args) throws Exception {
   
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个交换器
        channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);

        for (int i = 0; i < 15; i++) {
   
             channel.basicPublish("ex.myfan", "", null, ("hello world fan: " + i).getBytes("utf-8"));
        }

        channel.close();
        connection.close();
    }
}

分别创建三个consumer

consumer1

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @program: rabbitmq_study
 * @Description
 * @author Elvis
 * @date 2021-08-22 19:04    
 */
public class Consumer {
   
    public static void main(String[] args)  throws Exception{
   
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个交换器
        channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);

        String queue = channel.queueDeclare().getQueue();

        // fanout类型的交换器绑定不需要routingkey
        channel.queueBind(queue, "ex.myfan", "");

        channel.basicConsume(queue, (consumerTag, message)->{
   
            System.out.println( "One   " + new String(message.getBody()));
        }, (consumerTag)->{
   });

    }
}

consumer2

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @program: rabbitmq_study
 * @Description
 * @author Elvis
 * @date 2021-08-22 19:04    
 */
public class TwoConsumer {
   
    public static void main(String[] args)  throws Exception{
   
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个交换器
        channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);

        String queue = channel.queueDeclare().getQueue();

        // fanout类型的交换器绑定不需要routingkey
        channel.queueBind(queue, "ex.myfan", "");

        channel.basicConsume(queue, (consumerTag, message)->{
   
            System.out.println( "Two   " + new String(message.getBody()));
        }, (consumerTag)->{
   });

    }
}

consumer3

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @program: rabbitmq_study
 * @Description
 * @author Elvis
 * @date 2021-08-22 19:04    
 */
public class ThreeConsumer {
   
    public static void main(String[] args)  throws Exception{
   
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明一个交换器
        channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);

        String queue = channel.queueDeclare().getQueue();

        // fanout类型的交换器绑定不需要routingkey
        channel.queueBind(queue, "ex.myfan", "");

        channel.basicConsume(queue, (consumerTag, message)->{
   
            System.out.println( "Three   " + new String(message.getBody()));
        }, (consumerTag)->{
   });

    }
}


当消费者启动起来之后,命令rabbitmqctl list_bindings 列出绑定关系:

路由模式

  使用direct 类型的Exchange,发N条消费并使用不同的routingKey ,消费者定义队列并将队列、routingKey 、Exchange绑定。此时使用direct 模式Exchagne必须要routingKey 完全匹配的情况下消息才会转发到对应的队列中被消费。

  上一个模式中,可以将消息广播到很多接收者。

  现在我们想让接收者只接收部分消息,如,我们通过直接模式的交换器将关键的错误信息记录到log文件,同时在控制台正常打印所有的日志信息。

  绑定
上一模式中,交换器的使用方式:

channel.queueBind(queueName, 1 EXCHANGE_NAME, "");

绑定语句中还有第三个参数: routingKey :

channel.queueBind(queueName, EXCHANGE_NAME, "black");

bindingKey 的作用与具体使用的交换器类型有关。对于fanout 类型的交换器,此参数设置无效,系统直接忽略。

direct交换器

  分布式系统中有很多应用,这些应用需要运维平台的监控,其中一个重要的信息就是服务器的日志记录。

  我们需要将不同日志级别的日志记录交给不同的应用处理。

  如何解决?

  使用direct交换器

  如果要对不同的消息做不同的处理,此时不能使用fanout 类型的交换器,因为它只会盲目的广播消息。
我们需要使用direct 类型的交换器。direct 交换器的路由算法很简单:只要消息的routingKey 和队列的bindingKey 对应,消息就可以推送给该队列。


上图中的交换器X 是direct 类型的交换器,绑定的两个队列中,一个队列的bindingKey 是orange ,另一个队列的bindingKey 是black 和green 。

  如此,则routingKey 是orange 的消息发送给队列Q1, routingKey 是black 和green 的消息发送给Q2队列,其他消息丢弃。


上图中,我们使用direct 类型的交换器X ,建立了两个绑定:队列Q1根据bindingKey 的值black 绑定到交换器X ,队列Q2根据bindingKey 的值black 绑定到交换器X ;交换器X 会将消息发送给队列Q1和队列Q2。交换器的行为跟fanout 的行为类似,也是广播。

  在案例中,我们将日志级别作为routingKey 。

EmitLogsDirect.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class EmitLogsDirect {
   
    public static void main(String[] args) throws IOException,
            TimeoutException {
   
        ConnectionFactory factory = new ConnectionFactory();
		factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String servrity = null;
// 声明direct类型的交换器logs
        channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
        for (int i = 0; i < 100; i++) {
   
            switch (i % 3) {
   
                case 0:
                    servrity = "info";
                    break;
                case 1:
                    servrity = "warn";
                    break;
                case 2:
                    servrity = "error";
                    break;
                default:
                    System.err.println("log错误,程序退出");
                    System.exit(-1);
            }
            String logStr = "这是 【" + servrity + "】 的消息";
            channel.basicPublish("direct_logs", servrity, null,
                    logStr.getBytes("UTF-8"));
        }
    }
}

ReceiveErrorLogsDirect.java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveErrorLogsDirect {
   
    public static void main(String[] args) throws IOException,
            TimeoutException {
   
        ConnectionFactory factory = new ConnectionFactory();
		factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f"); factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();
// 将logs交换器和queueName队列通过bindingKey:error绑定
        channel.queueBind(queueName, "direct_logs", "error");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" +
                    message + "'");
        };
        channel.basicConsume(queueName, deliverCallback, consumerTag ->
        {
   });
    }
}

ReceiveWarnInfoLogsDirect.java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveWarnInfoLogsDirect {
   
    public static void main(String[] args) throws IOException,
            TimeoutException {
   
        ConnectionFactory factory = new ConnectionFactory();
		factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();
// 将logs交换器和queueName队列通过bindingKey:warn绑定
        channel.queueBind(queueName, "direct_logs", "warn");
// 将logs交换器和queueName队列通过bindingKey:info绑定
        channel.queueBind(queueName, "direct_logs", "info");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" +
                    message + "'");
        };
        channel.basicConsume(queueName, deliverCallback, consumerTag ->
        {
   });
    }
}

主题模式

  使用topic 类型的交换器,队列绑定到交换器、bindingKey 时使用通配符,交换器将消息路由转发到具体队列时会根据消息routingKey 模糊匹配,比较灵活。

  上个模式中,我们通过direct 类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。

  这里有一个限制,加入现在我不仅想根据日志级别划分日志消息,还想根据日志来源划分日志,怎么做?

  比如,我想监听cron服务发送的error 消息,又想监听从kern服务发送的所有消息。

  此时可以使用RabbitMQ的主题模式( Topic )。

  要想topic 类型的交换器, routingKey 就不能随便写了,它必须得是点分单词。单词可以随便写,生产中一般使用消息的特征。如:"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"等。该点分单词字符串最长255字节。

  bindingKey 也必须是这种形式。topic 类型的交换器背后原理跟direct 类型的类似:只要队列的bindingKey 的值与消息的routingKey 匹配,队列就可以收到该消息。有两个不同:

  1. * (star)匹配一个单词
  2. 匹配0到多个单词


上图中,我们发送描述动物的消息。消息发送的时候指定的routingKey 包含了三个词,两个点。第一个单词表示动物的速度,第二个是颜色,第三个是物种:<speed>.<color>.<species>。

  创建三个绑定:Q1绑定到" *.orange.* "Q2绑定到" *.*.rabbit "和" lazy.# "。

  1. Q1关注orange颜色动物的消息
  2. Q2关注兔子的消息,以及所有懒的动物消息

  如果不能匹配,就丢弃消息。
如果发送的消息routingKey 是" lazy.orange.male.rabbit ",则会匹配最后一个绑定。
如果在topic 类型的交换器中bindingKey 使用# ,则就是fanout 类型交换器的行为。
如果在topic 类型的交换器中bindingKey 中不使用* 和# ,则就是direct 类型交换器的行为。

EmitLogTopic.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

public class EmitLogTopic {
   
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String[] SPEED = {
   
            "lazy",
            "quick",
            "normal"
    };
    private static final String[] COLOR = {
   
            "black",
            "orange",
            "red",
            "yellow",
            "blue",
            "white",
            "pink"
    };
    private static final String[] SPECIES = {
   
            "dog",
            "rabbit",
            "chicken",
            "horse",
            "bear",
            "cat"
    };
    private static final Random RANDOM = new Random();
    public static void main(String[] args) throws IOException,
            TimeoutException {
   
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.31.10");
        factory.setVirtualHost("/");
        factory.setUsername("root");
        factory.setPassword("123456");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String message = null;
        String routingKey = null;
        String speed = null;
        String color = null;
        String species = null;
        for (int i = 0; i < 10; i++) {
   
            speed = SPEED[RANDOM.nextInt(SPEED.length)];
            color = COLOR[RANDOM.nextInt(COLOR.length)];
            species = SPECIES[RANDOM.nextInt(SPECIES.length)];
            message = speed + "-" + color + "-" + species;
            routingKey = speed + "." + color + "." + species;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null,
                    message.getBytes());
        }
        System.out.println(" [x] Sent '" + routingKey + "':'" + message +
                "'");
    }
}

EmitLogTopic1.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

public class EmitLogTopic1 {
   
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String[] SPECIES = {
   
            "dog",
            "rabbit",
            "chicken",
            "horse",
            "bear",
            "cat"
    };
    private static final Random RANDOM = new Random();
    public static void main(String[] args) throws IOException,
            TimeoutException {
   
        ConnectionFactory factory = new ConnectionFactory();
		factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String message = null;
        String routingKey = null;
        String speed = null;
        String species = null;
        for (int i = 0; i < 10; i++) {
   
            speed = "lazy";
            species = SPECIES[RANDOM.nextInt(SPECIES.length)];
            message = speed + "-" + species;
            routingKey = speed + "." + species;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null,
                    message.getBytes());
        }
        System.out.println(" [x] Sent '" + routingKey + "':'" + message +
                "'");
    }
}

ReceiveLogTopic.java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class ReceiveLogsTopic {
   
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws IOException,
            TimeoutException {
   
        ConnectionFactory factory = new ConnectionFactory();
		factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        DeliverCallback callback = (consumerTag, message) -> {
   
            System.out.println("*.*.rabbit 匹配到的消息:" + new
                    String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume(queueName, true, callback, consumerTag -> {
   });
    }
}

ReceiveLogTopic1.java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class ReceiveLogsTopic1 {
   
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws IOException,
            TimeoutException {
   
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        DeliverCallback callback = (consumerTag, message) -> {
   
            System.out.println("*.orange.* 匹配到的消息:" + new
                    String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume(queueName, true, callback, consumerTag -> {
   });
    }
}

ReceiveLopTopic2.java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsTopic2 {
   
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws IOException,
            TimeoutException {
   
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.*.*");
        DeliverCallback callback = (consumerTag, message) -> {
   
            System.out.println("lazy.*.* 匹配到的消息:" + new
                    String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume(queueName, true, callback, consumerTag -> {
   });
    }
}

ReceiveLogTopic3.java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogsTopic3 {
   
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws IOException,
            TimeoutException {
   
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.31.10:5672/%2f");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        DeliverCallback callback = (consumerTag, message) -> {
   
            System.out.println("lazy.*.* 匹配到的消息:" + new
                    String(message.getBody(), "UTF-8"));
        };
        channel.basicConsume(queueName, true, callback, consumerTag -> {
   });
    }
}

原文链接: https://blog.csdn.net/Kiven_ch/article/details/119853942

标签: #RabbitMQ 10 #软件开发 1171
相关文章

万字:支付“核心系统”详解 2024-11-02 15:33

专栏作者:隐墨星辰 \| 主编:陈天宇宙 这篇文章也尝试化繁为简,探寻支付系统的本质,讲清楚在线支付系统最核心的一些概念和设计理念。 虽然支付行业已经过了风头最劲的时光,但跨境支付仍然在蓬勃发展,每年依然有很多新人进入这个行业,这篇文章尝试为这些刚入行的新人提供一点帮助。 文章只介绍一些支付行业十几

资深支付架构师视角:实战从问题定义到代码落地的完整套路 2024-11-02 15:33

前言 今天从一个实际案例入手,介绍站在架构师的角度,如何识别并定义问题,提炼需求,技术方案选型,再到详细设计,最后利用AI的能力协助写出核心的代码,验证与调优。 解决问题存在一定的模式,也可以称之为框架,总结出自己的思考和解题框架,以后再碰到同类型的问题就可以如庖丁解牛一样容易。 很多年前,我写代码

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

设计模式第16讲——迭代器模式(Iterator) 2024-10-08 11:24

一、什么是迭代器模式 迭代器模式是一种行为型设计模式,它提供了一种统一的方式来访问集合对象中的元素,而不是暴露集合内部的表示方式。简单地说,就是将遍历集合的责任封装到一个单独的对象中,我们可以按照特定的方式访问集合中的元素。 二、角色组成 抽象迭代器(Iterator):定义了遍历聚合对象所需的方法

vue2路由和vue3路由区别及原理 2024-10-08 11:24

一、Vue2 与 Vue3 路由的区别 1. 创建路由实例方式的不同 Vue 2 中,通过 Vue.use() 注册路由插件,并通过 new VueRouter() 来创建路由实例。 import Vue from 'vue';import VueRouter from 'vue-router';i

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.