Appearance
二、消息在RabbitMQ中的流转过程及重要角色
1、AMQP
2、核心概念
Server(Broker) | 又称Broker,接受客户端的连接,实现AMQP实体服务,接受和发送消息的实例应用,RabbitMQ Server就是Message Broker |
---|---|
Virtual host | 用于进行逻辑隔离,最上层的消息路由,类似于网络中的namespace概念。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue。 |
Connection | 连接:应用程序与Broker的网络连接 |
Channel | 网络通道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可建立多个Channel,每个Channel代表一个会话任务 |
Message | 消息,服务器与应用程序之间传递的数据,由Properties和Body组成。Properties可以对消息进行装饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容 |
Exchange | 消息到达broker的第一站就是交换机,用于交换消息,根据路由键等分发规则转发消息到绑定的队列。 |
Binding | Exchange和Queue之间的虚拟连接,binding中可以包含routing key |
Routing key | 一个路由规则,虚拟机可用它来确定如何路由一个特定消息 |
Queue | 也称为Message Queue,消息队列,保存消息并将它们转发给消费者 |
3、一条消息在RabbitMQ中的流转过程
图示的主要流程如下:
1 生产者发送message消息的时候指定RoutingKey,然后消息被发送到Exchange
2 Exchange根据一些列规则将消息路由到指定的队列中
3 消费者consumer从队列中消费消息
整个流程主要就4个参与者message,exchange,queue,consumer
4、整体架构
5、重要角色
1、Message
属性名 | 用处 |
---|---|
contentType | 消息体的MIME类型,如application/json |
contentEncoding | 消息的编码类型,如是否压缩 |
messageId | 消息的唯一性标识,由应用进行设置 |
timestamp | 消息的创建时间,整型,精确到秒 |
deliveryMode | 消息的持久化类型,1为非持久化,2为持久化,性能影响巨大 |
headers | 键/值对表,用户自定义任意的键和值 |
priority | 指定队列中消息的优先级 |
2、Exchange(重要)
3、Queue
队列的常见属性
参数名 | 用处 |
---|---|
queue | 队列的名称 |
durable | 是否持久化,true为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息 |
exclusive | 设置是否排他,true为排他。如果一个队列被声明为排他队列,该队列仅对首次声明他的连接可见,并在连接断开时自动删除(即一个队列只能有一个消费者) |
autoDelete | 设置是否自动删除,true为自动删除,自动删除的前提是,至少一个消费者连接到这个队列,之后所有与这个连接的消费者都断开时,才会自动删除 |
arguments | 设置队列的其他参数,如x-message-ttl,x-max-length |
java
public class Queue extends AbstractDeclarable implements Cloneable {
public static final String X_QUEUE_MASTER_LOCATOR = "x-queue-master-locator";
private final String name;
private final boolean durable;
private final boolean exclusive;
private final boolean autoDelete;
private volatile String actualName;
}
4、队列中arguments属性
参数名 | 目的 |
---|---|
x-dead-letter-exchange | 死信交换机 |
x-dead-letter-routing-key | 死信消息的可选路由键 |
x-expires | 队列在指定毫秒数后被删除 |
x-ha-policy | 创建HA队列 |
x-ha-nodes | HA队列的分布节点 |
x-max-length | 队列的最大消息数 |
x-message-ttl | 毫秒为单位的消息过期时间,队列级别 |
x-max-priority | 最大优先值为255的队列优先排序功能 |
5、pom依赖
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xue</groupId>
<artifactId>spring-boot-rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--rabbitmq
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
6、交换机程序说明
一般先启动消费者
1、Direct定向
1、生产者
需新建 test_direct_exchange
交换机
java
package com.xue.exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 10:33
*/
public class Producer4DirectExchange {
public static final String QUEUE_NAME = "test_direct_queue";
public static final String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过连接创建channel
Channel channel = connection.createChannel();
//4 创建并定义队列属性
/* String queue, 队列名称
boolean durable, 是否持久化
boolean exclusive, 是否独占本次连接
boolean autoDelete, 没人用时自动删除
Map<String, Object> arguments 其它额外参数
*/
//5 通过channel发送消息
String msg = "this is direct exchange test" + UUID.randomUUID();
String routingKey = "abc";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
channel.close();
connection.close();
System.out.println("-----生产者发送消息完毕。。。。。");
}
}
2、消费者
java
package com.xue.exchange;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 10:37
*/
public class Consumer4DirectExchange {
public static final String QUEUE_NAME = "test_direct_queue";
public static final String EXCHANGE_NAME = "test_direct_exchange";
public static final String EXCHANGE_TYPE = "direct";
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过连接创建channel
Channel channel = connection.createChannel();
//4 交换机+队列+绑定关系
//表示声明了一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
//表示声明了一个队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//建立一个绑定关系
String routingKey = "abc";
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(" -------" + new String(body, "UTF-8"));
System.out.println("路由key: " + envelope.getRoutingKey() + " getExchange\t" + envelope.getExchange() + " getDeliveryTag\t" + envelope.getDeliveryTag());
}
};
//5 通过channel消费消息
// boolean autoAck 消息接受到了自动向mq服务器回复收到了,mq收到后会自动删除消息,false需要手动确认
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
对消费者改改路由词,再让生产者保持原样发送试试
每一次消费者,都会对原有队列加入新的 ----路由词
2、Fanout广播
需新建test_fanout_exchange
交换机
生产者
java
package com.xue.exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 10:41
*/
public class Producer4FanoutExchange {
public static final String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过连接创建channel
Channel channel = connection.createChannel();
//4 创建并定义队列属性
//5 通过channel发送消息
String routingKey = "";
String msg = "this is fanout exchange test: " + UUID.randomUUID().toString() + "\t " + routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
channel.close();
connection.close();
System.out.println("-----生产者发送消息完毕。。。。。");
}
}
消费者
java
package com.xue.exchange;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 10:43
*/
public class Consumer4FanoutExchange {
public static final String QUEUE_NAME = "test_fanout_queue1";
public static final String EXCHANGE_NAME = "test_fanout_exchange";
public static final String EXCHANGE_TYPE = "fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过连接创建channel
Channel channel = connection.createChannel();
//4 交换机+队列+绑定关系
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_TYPE);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----msg result: " + new String(body, "utf-8"));
}
});
}
}
3、Topic通配符
需新建test_topic_exchange
交换机
生产者
java
package com.xue.exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 10:55
*/
public class Producer4TopicExchange {
public static final String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过连接创建channel
Channel channel = connection.createChannel();
//4 创建并定义队列属性
//channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//5 通过channel发送消息
String routingKey = "abc.1.x";
String msg = "this is topic exchange test" + UUID.randomUUID().toString() + "\t " + routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
channel.close();
connection.close();
System.out.println("-----生产者发送消息完毕。。。。。");
}
}
消费者
java
package com.xue.exchange;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 10:56
*/
public class Consumer4TopicExchange {
public static final String QUEUE_NAME = "test_topic_queue";
public static final String EXCHANGE_NAME = "test_topic_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTINGKEY = "abc.*";
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建工厂connectionFactory,并为其设置ip+端口+virtual host
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3 通过连接创建channel
Channel channel = connection.createChannel();
//4 交换机+队列+绑定关系
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----msg result: " + new String(body, "utf-8"));
}
});
}
}