Skip to content

二、消息在RabbitMQ中的流转过程及重要角色

1、AMQP

image-20220426174620917

image-20220509095724257

image-20220426174655984

2、核心概念

image-20220426174717877

Server(Broker)又称Broker,接受客户端的连接,实现AMQP实体服务,接受和发送消息的实例应用,RabbitMQ Server就是Message Broker
Virtual host用于进行逻辑隔离,最上层的消息路由,类似于网络中的namespace概念。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue。
Connection连接:应用程序与Broker的网络连接
Channel网络通道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可建立多个Channel,每个Channel代表一个会话任务
Message消息,服务器与应用程序之间传递的数据,由Properties和Body组成。Properties可以对消息进行装饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容
Exchange消息到达broker的第一站就是交换机,用于交换消息,根据路由键等分发规则转发消息到绑定的队列。
BindingExchange和Queue之间的虚拟连接,binding中可以包含routing key
Routing key一个路由规则,虚拟机可用它来确定如何路由一个特定消息
Queue也称为Message Queue,消息队列,保存消息并将它们转发给消费者

3、一条消息在RabbitMQ中的流转过程

image-20220426174854002

图示的主要流程如下:

1 生产者发送message消息的时候指定RoutingKey,然后消息被发送到Exchange

2 Exchange根据一些列规则将消息路由到指定的队列中

3 消费者consumer从队列中消费消息

整个流程主要就4个参与者message,exchange,queue,consumer

4、整体架构

image-20220426174909806

image-20220426174914818

5、重要角色

1、Message

属性名用处
contentType消息体的MIME类型,如application/json
contentEncoding消息的编码类型,如是否压缩
messageId消息的唯一性标识,由应用进行设置
timestamp消息的创建时间,整型,精确到秒
deliveryMode消息的持久化类型,1为非持久化,2为持久化,性能影响巨大
headers键/值对表,用户自定义任意的键和值
priority指定队列中消息的优先级

2、Exchange(重要)

image-20220509101234875

3、Queue

队列的常见属性

参数名用处
queue队列的名称
durable是否持久化,true为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息
exclusive设置是否排他,true为排他。如果一个队列被声明为排他队列,该队列仅对首次声明他的连接可见,并在连接断开时自动删除(即一个队列只能有一个消费者)
autoDelete设置是否自动删除,true为自动删除,自动删除的前提是,至少一个消费者连接到这个队列,之后所有与这个连接的消费者都断开时,才会自动删除
arguments设置队列的其他参数,如x-message-ttl,x-max-length
java
public class Queue extends AbstractDeclarable implements Cloneable {
    public static final String X_QUEUE_MASTER_LOCATOR = "x-queue-master-locator";
    private final String name;
    private final boolean durable;
    private final boolean exclusive;
    private final boolean autoDelete;
    private volatile String actualName;
}

4、队列中arguments属性

参数名目的
x-dead-letter-exchange死信交换机
x-dead-letter-routing-key死信消息的可选路由键
x-expires队列在指定毫秒数后被删除
x-ha-policy创建HA队列
x-ha-nodesHA队列的分布节点
x-max-length队列的最大消息数
x-message-ttl毫秒为单位的消息过期时间,队列级别
x-max-priority最大优先值为255的队列优先排序功能

5、pom依赖

xml
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.xue</groupId>
    <artifactId>spring-boot-rabbitmq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath/>
    </parent>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!--rabbitmq
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

6、交换机程序说明

一般先启动消费者

1、Direct定向

1、生产者

需新建 test_direct_exchange 交换机

java
package com.xue.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * @Author: xueqimiao
 * @Date: 2022/5/9 10:33
 */
public class Producer4DirectExchange {

    public static final String QUEUE_NAME = "test_direct_queue";
    public static final String EXCHANGE_NAME = "test_direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过连接创建channel
        Channel channel = connection.createChannel();


        //4 创建并定义队列属性
        /* String queue,    队列名称
           boolean durable, 是否持久化
           boolean exclusive, 是否独占本次连接
           boolean autoDelete, 没人用时自动删除
           Map<String, Object> arguments 其它额外参数
        */
        //5 通过channel发送消息
        String msg = "this is direct exchange test" + UUID.randomUUID();
        String routingKey = "abc";

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

        channel.close();
        connection.close();

        System.out.println("-----生产者发送消息完毕。。。。。");
    }

}

2、消费者

java
package com.xue.exchange;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author: xueqimiao
 * @Date: 2022/5/9 10:37
 */
public class Consumer4DirectExchange {
    public static final String QUEUE_NAME = "test_direct_queue";
    public static final String EXCHANGE_NAME = "test_direct_exchange";
    public static final String EXCHANGE_TYPE = "direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过连接创建channel
        Channel channel = connection.createChannel();

        //4 交换机+队列+绑定关系
        //表示声明了一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //建立一个绑定关系
        String routingKey = "abc";
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("  -------" + new String(body, "UTF-8"));
                System.out.println("路由key: " + envelope.getRoutingKey() + " getExchange\t" + envelope.getExchange() + " getDeliveryTag\t" + envelope.getDeliveryTag());
            }
        };

        //5 通过channel消费消息
        // boolean autoAck 消息接受到了自动向mq服务器回复收到了,mq收到后会自动删除消息,false需要手动确认
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

对消费者改改路由词,再让生产者保持原样发送试试

每一次消费者,都会对原有队列加入新的 ----路由词

2、Fanout广播

需新建test_fanout_exchange交换机

生产者

java
package com.xue.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * @Author: xueqimiao
 * @Date: 2022/5/9 10:41
 */
public class Producer4FanoutExchange {
    public static final String EXCHANGE_NAME = "test_fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过连接创建channel
        Channel channel = connection.createChannel();

        //4 创建并定义队列属性

        //5 通过channel发送消息
        String routingKey = "";
        String msg = "this is fanout exchange test:  " + UUID.randomUUID().toString() + "\t " + routingKey;

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

        channel.close();
        connection.close();

        System.out.println("-----生产者发送消息完毕。。。。。");
    }
}

消费者

java
package com.xue.exchange;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author: xueqimiao
 * @Date: 2022/5/9 10:43
 */
public class Consumer4FanoutExchange {
    public static final String QUEUE_NAME = "test_fanout_queue1";
    public static final String EXCHANGE_NAME = "test_fanout_exchange";
    public static final String EXCHANGE_TYPE = "fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过连接创建channel
        Channel channel = connection.createChannel();

        //4 交换机+队列+绑定关系
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_TYPE);

        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("----msg result: " + new String(body, "utf-8"));
            }
        });

    }
}

3、Topic通配符

需新建test_topic_exchange交换机

生产者

java
package com.xue.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * @Author: xueqimiao
 * @Date: 2022/5/9 10:55
 */
public class Producer4TopicExchange {
    public static final String EXCHANGE_NAME = "test_topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过连接创建channel
        Channel channel = connection.createChannel();

        //4 创建并定义队列属性
        //channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        //5 通过channel发送消息
        String routingKey = "abc.1.x";
        String msg = "this is topic exchange test" + UUID.randomUUID().toString() + "\t " + routingKey;

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

        channel.close();
        connection.close();

        System.out.println("-----生产者发送消息完毕。。。。。");
    }
}

消费者

java
package com.xue.exchange;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author: xueqimiao
 * @Date: 2022/5/9 10:56
 */
public class Consumer4TopicExchange {
    public static final String QUEUE_NAME = "test_topic_queue";
    public static final String EXCHANGE_NAME = "test_topic_exchange";
    public static final String EXCHANGE_TYPE = "topic";
    public static final String ROUTINGKEY = "abc.*";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3 通过连接创建channel
        Channel channel = connection.createChannel();

        //4 交换机+队列+绑定关系
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);

        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("----msg result: " + new String(body, "utf-8"));
            }
        });

    }
}