Appearance
三、MQ高级特性
1、永远的HelloWorld
相关概念
ConnectionFactory:获取连接工厂
Connection:一个链接
Channel:数据通信通道,可以发送和接收消息
Queue:具体的消息存储队列
Producer & Consumer:生产和消费者
1、第一组
生产者
java
package com.xue.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* @author xueqimiao
* @Date: 2022/5/9 10:37
*/
public class Producer {
public static final String QUEUE_NAME = "queue-hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String msg = "hello mq" + UUID.randomUUID().toString();
channel.basicPublish("", "queue-hello", null, msg.getBytes());
channel.close();
connection.close();
System.out.println("-----生产者消费发送完毕");
}
}
消费者
java
package com.xue.hello;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author xueqimiao
* @Date: 2022/5/9 10:37
*/
public class Consumer {
public static final String QUEUE_NAME = "queue-hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setVirtualHost("/");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("--------msg result: " + new String(body, "utf-8"));
System.out.println("------getExchange: " + envelope.getExchange());
System.out.println("------getRoutingKey: " + envelope.getRoutingKey());
}
});
}
}
总结
问题和异常
java
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
上面代码并没有写交换机的名字
为什么可以运行?
2、第二组
生产者
有2个消费者,如何消费
ConnectionUtil
java
package com.xue.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 11:01
*/
public class ConnectionUtil {
public static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("127.0.0.1");
}
public static Connection getConnection() throws IOException, TimeoutException {
return connectionFactory.newConnection();
}
}
2、什么是死信队列?(死信队列:DLX,dead-letter-exchange)
1 是什么
死信,在官网中对应的单词为“Dead Letter”。
死信队列:DLX,dead-letter-exchange “死信”是RabbitMQ中的一种消息机制,当在消费消息时如果队列里的消息出现以下情况:
2 消息变成死信有以下几种情况
2.1 消息被拒绝,使用 channel.basicNack 或 channel.basicReject 且此时requeue属性被设置为false。
2.2 消息超时过期,在队列的存活时间超过设置的TTL时间。
2.3 消息队列的消息数量已经超过最大队列长度,那么该消息将成为“死信”。
3 死信处理过程
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。也即利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新发布到另一个Exchange,这个Exchange就是DLX,可以监听这个队列中的消息做相应的处理。
4 死信队列设置
4.1 首先需要设置死信队列的exchange和queue,然后进行绑定
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: # #表示只要有消息到达了Exchange,那么都会路由到这个queue上
4.2 然后需要有一个监听,去监听这个队列进行处理
4.3 然后我们进行正常声明交换机、队列、绑定。只不过我们需要在正常队列加上一个参数:
arguments.put(" x-dead-letter-exchange","dlx.exchange");,
这样消息在过期、被拒绝、 队列在达到最大长时,消息就可以直接路由到死信队列!
也即告诉正常交换机,对于死信你处理不了的话,交给死信交换机。
https://www.rabbitmq.com/dlx.html
1、普通版
生产者
java
package com.xue.dlx;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 11:04
*/
public class ProducerDLX {
public static final String EXCHANGE_NAME = "test_normal_exchange";
public static final String QUEUE_NAME = "test_normal_queue";
public static final String ROUTINGKEY = "dlx.update";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("15000").contentEncoding("utf-8").build();
String msg = "hello,我是死信消息,超过15秒未被消费直接进入死信队列";
channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, props, msg.getBytes());
channel.close();
connection.close();
System.out.println("-----生产者发送消息完毕。。。。。");
}
}
消费者
java
package com.xue.dlx;
import com.rabbitmq.client.*;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 11:06
*/
public class ConsumerDLX {
public static final String EXCHANGE_NAME = "test_normal_exchange";
public static final String QUEUE_NAME = "test_normal_queue";
public static final String ROUTINGKEY = "dlx.#";
public static final String DLX_EXCHANGE_NAME = "test_dlx_exchange";
public static final String DLX_QUEUE_NAME = "test_dlx_queue";
public static final String DLX_ROUTINGKEY = "#";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//正常交互机+队列
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
//死信
channel.exchangeDeclare(DLX_EXCHANGE_NAME, "topic");
channel.queueDeclare(DLX_QUEUE_NAME, true, false, false, null);
channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, DLX_ROUTINGKEY);
//读取正常消息
/*channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("-----consumerTag: "+consumerTag+"\t"+"-----result: "+new String(body,"utf-8"));
}
});*/
//读取死信队列的消息
channel.basicConsume(DLX_QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("DLX_QUEUE_NAME-----result: " + new String(body, "utf-8"));
}
});
}
}
2、死信队列有什么用途?
消息的延时投递
将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列
3、RabbitMQ的消息可靠性投递和消息确认机制,如何保证消息不丢丢,100%投递
确认并且保证消息被送达
怎么保证 MQ 消息不丢失?
1 生产者弄丢了数据
RabbitMQ 生产者将数据发送到 RabbitMQ 的时候,可能数据在网络传输中搞丢了,这个时候 RabbitMQ 收不到消息,消息就丢了。RabbitMQ 提供了两种方式来解决这个问题:
1.1 事务机制:在生产者发送消息之前通过channel.txSelect开启一个事务,接着发送消息。如果消息没有成功被 RabbitMQ 接收到,生产者会收到异常,此时就可以进行事务回滚channel.txRollback,然后重新发送。假如 RabbitMQ 收到了这个消息,就可以提交事务channel.txCommit。但是这样一来,生产者的吞吐量和性能都会降低很多,现在一般不这么干。
1.2 Confirm机制:这个Confirm模式是在生产者侧设置的,每次写消息的时候会分配一个唯一的ID之后RabbitMQ收到之后会回传一个 ACK,告诉生产者这个消息OK了。如果RabbitMQ没有处理到这个消息,那么就回调一个Nack的接口,这个时候生产者就可以重发。
事务机制和 Confirm 机制最大的不同在于事务机制是同步的,提交一个事务之后会阻塞在那儿。
但是 Confirm 机制是异步的,发送一个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用 Confirm 机制的。
2 RabbitMQ服务器弄丢了数据
RabbitMQ 集群也会弄丢消息,这个问题在官方文档的教程中也提到过,就是说在消息发送到 RabbitMQ 之后,默认是没有落地磁盘的,万一 RabbitMQ 宕机了,这个时候消息就丢失了。所以为了解决这个问题,RabbitMQ 提供了一个持久化的机制,消息写入之后会持久化到磁盘。这样哪怕是宕机了,恢复之后也会自动恢复之前存储的数据,这样的机制可以确保消息不会丢失。
设置持久化有两个步骤:
第一个是创建 Queue 的时候将其设置为持久化的,这样就可以保证 RabbitMQ 持久化 Queue 的元数据,但是不会持久化 Queue 里的数据。
第二个是发送消息的时候将消息自身的deliveryMode属性设置为2,将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
但是这样一来可能会有人说:万一消息发送到 RabbitMQ 之后,还没来得及持久化到磁盘就挂掉了,数据也丢失了,怎么办?对于这个问题,其实是配合上面的 Confirm 机制一起来保证的,就是在消息持久化到磁盘之后才会给生产者发送 ACK 消息。万一真的遇到了那种极端的情况,生产者是可以感知到的,此时生产者可以通过重试发送消息给别的 RabbitMQ 节点。
消费端弄丢了数据
RabbitMQ 消费端弄丢了数据的情况是这样的:在消费消息的时候,刚拿到消息,结果进程挂了,这个时候 RabbitMQ 就会认为你已经消费成功了,这条数据就丢了。
对于这个问题,要先说明一下 RabbitMQ 消费消息的机制:在消费者收到消息的时候,会发送一个 ACK 给 RabbitMQ,告诉 RabbitMQ 这条消息被消费到了,这样 RabbitMQ 就会把消息删除。但是默认情况下这个发送 ACK 的操作是自动提交的,也就是说消费者一收到这个消息就会自动返回 ACK 给 RabbitMQ,所以会出现丢消息的问题。
所以针对这个问题的解决方案就是:关闭 RabbitMQ 消费者的自动提交ACK,在消费者处理完这条消息之后再手动提交ACK。这样即使遇到了上面的情况,RabbitMQ 也不会把这条消息删除,会在你程序重启之后,重新下发这条消息过来。
1、AMQP事务模式
事务模式
java
channel.txSelect();
channel.txCommit();
channel.txRollback();
生产者
java
package com.xue.tx;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xue.util.ConnectionUtil;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 11:29
*/
public class ProducerTX {
public static final String EXCHANGE_NAME = "test_tx_exchange";
public static final String ROUTINGKEY = "tx.update";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//开启AMQP事务机制
try {
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, "hello tx msg".getBytes());
int i = 10 / 0;
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
} finally {
channel.close();
connection.close();
System.out.println("-------生产者发送完成");
}
}
}
消费者
java
package com.xue.tx;
import com.rabbitmq.client.*;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:02
*/
public class ConsumerTX {
public static final String EXCHANGE_NAME = "test_tx_exchange";
public static final String QUEUE_NAME = "test_tx_queue";
public static final String ROUTINGKEY = "tx.#";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-----result: " + new String(body));
}
});
}
}
2、Confirm确认
只在生产侧配置
channel开启确认模式
java
channel.confirmSelect()
channel开启监听器
java
addConfirmListener(ConfirmListener listener)
先启动消费者,再启动生产者: 消费者打印出消费成功的信息,然后重新发送消息,生产者接收到确认消费成功的消息后,打印出确认信息。
生产者
java
package com.xue.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:03
*/
public class ProducerConfirm {
public static final String EXCHANGE_NAME = "test_confirm_exchange";
public static final String ROUTINGKEY = "confirm.update";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
final long start = System.currentTimeMillis();
//添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息
channel.addConfirmListener(new ConfirmListener() {
//返回成功的回调函数
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("succuss ack" + "\t" + "multiple: " + multiple);
System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
}
//返回失败的回调函数
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.printf("error ack");
System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
}
});
for (int i = 0; i < 5; i++) {
String msg = "this is confirm msg " + i;
channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, msg.getBytes());
}
}
}
消费者
java
package com.xue.confirm;
import com.rabbitmq.client.*;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:04
*/
public class ConsumerConfirm {
public static final String EXCHANGE_NAME = "test_confirm_exchange";
public static final String QUEUE_NAME = "test_confirm_queue";
public static final String ROUTINGKEY = "confirm.#";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("------result: " + new String(body, "utf-8"));
}
});
}
}
服务器:开启持久化
消费者:关闭自动签收,ack=false
4、ACK机制了解吗?重回队列如何操作?requeue
生产者
java
package com.xue.ackrequeue;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xue.util.ConnectionUtil;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:06
*/
public class ProducerACKRequeue {
public static final String EXCHANGE_NAME = "test_ack_exchange";
public static final String ROUTINGKEY = "ack.update";
public static void main(String[] args) throws Exception {
try (Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel()) {
for (int i = 1; i <= 5; i++) {
String str = "hello ack msg:" + i;
Map<String, Object> map = new HashMap<>();
map.put("myNumber", i);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).headers(map).build();
channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, props, str.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者
java
package com.xue.ackrequeue;
import com.rabbitmq.client.*;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:06
*/
public class ConsumerACKRequeue {
public static final String EXCHANGE_NAME = "test_ack_exchange";
public static final String QUEUE_NAME = "test_ack_queue";
public static final String ROUTINGKEY = "ack.#";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
//手动签收,autoACK属性设置为false
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if ((Integer) properties.getHeaders().get("myNumber") == 4) {
System.out.println("----------消费消息出问题,请重试或者联系管理员");
//basicNack(long deliveryTag, boolean multiple, boolean requeue)
// boolean requeue = true 就是重新回到队列,再次投递
channel.basicNack(envelope.getDeliveryTag(), false, false);
//每次间隔2秒钟重试
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println("------result: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
System.out.println("------------consumer come here");
}
}
演示重回队列,打爆CPU,^_^
消费者2
java
package com.xue.ackrequeue;
import com.rabbitmq.client.*;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:07
*/
public class ConsumerACKRequeue2 {
public static final String EXCHANGE_NAME = "test_ack_exchange";
public static final String QUEUE_NAME = "test_ack_queue";
public static final String ROUTINGKEY = "ack.#";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Integer myNumber = (Integer) properties.getHeaders().get("myNumber");
try {
if (myNumber == 4) {
int age = 10 / 0;
}
System.out.println("-----result: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
if (envelope.isRedeliver()) {
System.out.println("消息已重复处理失败,拒绝再次接收...");
channel.basicReject(envelope.getDeliveryTag(), false); // 拒绝消息
} else {
System.out.println("消息即将再次返回队列处理...");
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
e.printStackTrace();
}
}
});
System.out.println("------------Consumer_ACKRequeue come here");
}
}
优化重回队列
java
channel.basicNack 与 channel.basicReject 的区别在于
basicNack可以拒绝多条消息,
而basicReject一次只能拒绝一条消息
5、MQ丢消息你怎么办?对于不可达消息或者无人认领的消息,你如何处理?return应答听说过吗?
监听哪些不可路由的消息,比如路由词对不上不匹配,没有一个交换机收到过该消息。但是生产者确实发送了,这些消息没人认领,如何监听这些不可达的消息?
活要见人,人要见尸
如何追踪不可达消息
只在生产侧配置
java
Mandatory属性设置为true
添加ReturnListener接口
存在交换机的前提下,Mandatory属性设置为true
true ReturnListener接口会监听到路由不可达信息
false MQ服务器自动删除消息,无人管理
添加ReturnListener接口
java
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.rabbitmq.client;
import java.io.IOException;
public interface ReturnListener {
void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body) throws IOException;
}
channel.addReturnListener(new ReturnListener(){......})
生产者
java
package com.xue.returns;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:10
*/
public class ProducerReturn {
public static final String EXCHANGE_NAME = "test_return_exchange";
public static final String ROUTINGKEY = "return.update";
public static final String ROUTINGKEYERROR = "error.return.update";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicPublish(EXCHANGE_NAME, ROUTINGKEYERROR, true, null, "hello return msg".getBytes());
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-----replyCode: " + replyCode);
System.out.println("-----replyText: " + replyText);
System.out.println("-----exchange: " + exchange);
System.out.println("-----routingKey: " + routingKey);
System.out.println("-----msg body: " + new String(body));
}
});
}
}
消费者
java
package com.xue.returns;
import com.rabbitmq.client.*;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:11
*/
public class ConsumerReturn {
public static final String EXCHANGE_NAME = "test_return_exchange";
public static final String QUEUE_NAME = "test_return_queue";
public static final String ROUTINGKEY = "return.#";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("---------result: " + new String(body));
}
});
}
}
6、消息堆积如何处理?消息挤压很多很多,消费端一开启后,直接打爆MQ,如何做消费端限流???
提示,生产端是互联网客户,你不能要求生产者暂停一下。
只能在消费端侧配置
消费端限流策略实现
QoS的英文全称为"Quality of Service"
channel.basicQos();
生产者
java
package com.xue.flowlimit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xue.util.ConnectionUtil;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:12
*/
public class ProducerFlowLimit {
public static final String EXCHANGE_NAME = "test_qos_exchange";
public static final String ROUTINGKEY = "qos.update";
public static void main(String[] args) throws Exception {
try (Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();) {
for (int i = 0; i < 5; i++) {
channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, "flow limit msg: ".getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者
java
package com.xue.flowlimit;
import com.rabbitmq.client.*;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:13
*/
public class ConsumerFlowLimit {
public static final String EXCHANGE_NAME = "test_qos_exchange";
public static final String QUEUE_NAME = "test_qos_queue";
public static final String ROUTINGKEY = "qos.#";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
/**
*消费端限流操作:QoS的英文全称为"Quality of Service"
*
*1 basicQos(int prefetchSize, int prefetchCount, boolean global)
*
* 1.1 prefetchSize,消息大小,消息本身的大小 如果设置为零对消息本身的大小不限制,一般设置为零
* 1.2 prefetchCount,处理峰值,告诉rabbitmq不要一次性给消费者推送大于prefetchCount个消息,一般设置为1,处理完一条再来一条
* 1.3 global,应用范围 ,是否将上面的设置应用于整个通道,一般设置为false
* false:表示只应用于当前消费者
* true:表示当前通道的所有消费者都应用这个限流策略
*
*2 basicConsume方法里面的 autoAck必须设置为false,非自动签收
*
*3 channel.basicAck(envelope.getDeliveryTag(),false);
* boolean multiple, 代表是否批量处理,我们前面prefetchCount设置为1,表示一条条做,所以multiple一般设置为false
*/
channel.basicQos(0, 1, false);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//下面注释后可以看到积压效果
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("---------result: " + new String(body));
}
});
}
}
正常版
java
package com.xue.flowlimit;
import com.rabbitmq.client.*;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:14
*/
public class ConsumerFlowLimit2 {
public static final String EXCHANGE_NAME = "test_qos_exchange";
public static final String QUEUE_NAME = "test_qos_queue";
public static final String ROUTINGKEY = "qos.#";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
/**
*消费端限流操作:QoS的英文全称为"Quality of Service"
*
*1 basicQos(int prefetchSize, int prefetchCount, boolean global)
*
* 1.1 prefetchSize,消息大小,消息本身的大小 如果设置为零对消息本身的大小不限制,一般设置为零
* 1.2 prefetchCount,处理峰值,告诉rabbitmq不要一次性给消费者推送大于prefetchCount个消息,一般设置为1,处理完一条再来一条
* 1.3 global,应用范围 ,是否将上面的设置应用于整个通道,一般设置为false
* false:表示只应用于当前消费者
* true:表示当前通道的所有消费者都应用这个限流策略
*
*2 basicConsume方法里面的 autoAck必须设置为false,非自动签收
*
*3 channel.basicAck(envelope.getDeliveryTag(),false);
* boolean multiple, 代表是否批量处理,我们前面prefetchCount设置为1,表示一条条做,所以multiple一般设置为false
*/
channel.basicQos(0, 1, false);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//下面注释后可以看到积压效果
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("---------result: " + new String(body));
}
});
}
}
限流演示
java
package com.xue.flowlimit;
import com.rabbitmq.client.*;
import com.xue.util.ConnectionUtil;
import java.io.IOException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:14
*/
public class ConsumerFlowLimit3 {
public static final String EXCHANGE_NAME = "test_qos_exchange";
public static final String QUEUE_NAME = "test_qos_queue";
public static final String ROUTINGKEY = "qos.#";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
/**
*消费端限流操作:QoS的英文全称为"Quality of Service"
*
*1 basicQos(int prefetchSize, int prefetchCount, boolean global)
*
* 1.1 prefetchSize,消息大小,消息本身的大小 如果设置为零对消息本身的大小不限制,一般设置为零
* 1.2 prefetchCount,处理峰值,告诉rabbitmq不要一次性给消费者推送大于prefetchCount个消息,一般设置为1,处理完一条再来一条
* 1.3 global,应用范围 ,是否将上面的设置应用于整个通道,一般设置为false
* false:表示只应用于当前消费者
* true:表示当前通道的所有消费者都应用这个限流策略
*
*2 basicConsume方法里面的 autoAck必须设置为false,非自动签收
*
*3 channel.basicAck(envelope.getDeliveryTag(),false);
* boolean multiple, 代表是否批量处理,我们前面prefetchCount设置为1,表示一条条做,所以multiple一般设置为false
*/
channel.basicQos(0, 2, false);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-----result: " + new String(body));
//basicAck 被注释掉了
}
});
}
}
图说
生产者一次性发送了5条
消费端限流小总结
3个配置细节
7、什么是过期消息?设置上有哪些要求和特点
1 是什么
Time-To-Live ExtensionsRabbitMQ允许为消息或者队列设置TTL(time to live)也就是过期时间。
TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。
当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。
如果既配置了消息的TTL(expiration),又配置了队列的TTL(x-message-ttl),那么较小的那个值会被取用。
在时间有效期内可以被合理消费,超时后msg将被rabbitmq自动删除,过时不候。
2 对于TTL的设置,rabbitmq有两种设置方式
2.1 对队列设置,那整个队列里面的消息都是一视同仁,整体一致。
2.2 对消息设置,针对某条消息进行单独设置,每条都可以不同,各取所需。
3 假如上诉两个都设置了,那过期时间以TTL小的为准,谁小听谁的。
4 如果超过TTL规定时间的消息就称为dead message。将会被投递到死信队列,消费者就无法再收到该消息,超时消息爽,跑步火葬场。
最后复习一下什么是死信:https://www.rabbitmq.com/dlx.html
单独给队列设置
单独给某条消息设置
谁小听谁的
官网 https://www.rabbitmq.com/ttl.html
8、消息重试机制你谈谈?rabbitmq默认重试几次?如何修改重试次数?-boot
pom.xml
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xue</groupId>
<artifactId>spring-boot-rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--web+actuator+swagger-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--SpringBoot与Rabbitmq整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--SpringBoot与Redis整合依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--mybatis+mysql+druid依赖-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.3</version>
</dependency>
<!--mysql驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--druid-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.23</version>
</dependency>
<!--jackson-->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
实体类
java
package com.xue.entity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:18
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
@ApiModel("订单实体类信息")
public class Order implements Serializable {
@ApiModelProperty("订单ID")
private Long id;
@ApiModelProperty("订单名称")
private String orderName;
@ApiModelProperty("建单时间")
private Date createTime;
@ApiModelProperty("订单UUID")
private String orderUUID;
@ApiModelProperty("订单状态")
private Integer orderStatus;
@ApiModelProperty("业务类型")
private String bizType;
}
mapper
java
package com.xue.mapper;
import com.xue.entity.Order;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:28
*/
@Mapper
public interface OrderMapper {
List<Order> listAll();
int add(Order order);
Order getOrderById(@Param("id") Long id);
Order getOrderByOrderUUID(@Param("orderUUID") String orderUUID);
}
JsonUtils
java
package com.xue.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.type.JavaType;
import org.codehaus.jackson.type.TypeReference;
import java.io.IOException;
import java.text.SimpleDateFormat;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:29
*/
@Slf4j
public class JsonUtils {
private static ObjectMapper objectMapper = new ObjectMapper();
static {
// 对象字段全部列入
objectMapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_DEFAULT);
// 取消默认转换timestamps形式
objectMapper.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS,false);
// 忽略空bean转json的错误
objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS,false);
// 统一日期格式yyyy-MM-dd HH:mm:ss
objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
// 忽略在json字符串中存在,但是在java对象中不存在对应属性的情况
objectMapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false);
}
public static <T> String obj2String(T obj){
if (obj == null){
return null;
}
try {
return obj instanceof String ? (String) obj : objectMapper.writeValueAsString(obj);
} catch (Exception e) {
log.warn("Parse object to String error",e);
return null;
}
}
public static <T> String obj2StringPretty(T obj){
if (obj == null){
return null;
}
try {
return obj instanceof String ? (String) obj : objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj);
} catch (Exception e) {
log.warn("Parse object to String error",e);
return null;
}
}
public static <T> T string2Obj(String str,Class<T> clazz){
if (StringUtils.isEmpty(str) || clazz == null){
return null;
}
try {
return clazz.equals(String.class)? (T) str :objectMapper.readValue(str,clazz);
} catch (IOException e) {
log.warn("Parse String to Object error",e);
return null;
}
}
public static <T> T string2Obj(String str, TypeReference<T> typeReference){
if (StringUtils.isEmpty(str) || typeReference == null){
return null;
}
try {
return (T)(typeReference.getType().equals(String.class)? str :objectMapper.readValue(str,typeReference));
} catch (IOException e) {
log.warn("Parse String to Object error",e);
return null;
}
}
public static <T> T string2Obj(String str,Class<?> collectionClass,Class<?>... elementClasses){
JavaType javaType = objectMapper.getTypeFactory().constructParametricType(collectionClass,elementClasses);
try {
return objectMapper.readValue(str,javaType);
} catch (IOException e) {
log.warn("Parse String to Object error",e);
return null;
}
}
}
主要是消费者侧
此时默认
重试机制:默认无限次
签收机制:默认ack=true
正常演示
RedisConfig
java
package com.xue.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.io.Serializable;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:45
*/
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory)
{
RedisTemplate<String,Serializable> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
//key序列化
redisTemplate.setKeySerializer(new StringRedisSerializer());
//value序列化
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
//value hashmap序列化
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/*
//redisTemplate注入到Spring容器,老技术用RedisConnectionFactory
@Bean
public RedisTemplate<String,String> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String,String> redisTemplate=new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
redisTemplate.setConnectionFactory(factory);
//key序列化
redisTemplate.setKeySerializer(redisSerializer);
//value序列化
redisTemplate.setValueSerializer(redisSerializer);
//value hashmap序列化
redisTemplate.setHashKeySerializer(redisSerializer);
//key hashmap序列化
redisTemplate.setHashValueSerializer(redisSerializer);
return redisTemplate;
}*/
}
SwaggerConfig
java
package com.xue.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:46
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Value("${spring.swagger2.enabled}")
private Boolean enabled;
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.enable(enabled)
.select()
.apis(RequestHandlerSelectors.basePackage("com.xue"))
.paths(PathSelectors.any())
.build();
}
public ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("swagger2构建api接口文档 " + "\t" + new SimpleDateFormat("yyyy-MM-dd").format(new Date()))
.description("rabbitmq")
.version("1.0")
.termsOfServiceUrl("https://www.bilibili.com/video/BV1Hy4y1B78T")
.build();
}
}
java
package com.xue.service;
import com.xue.entity.Order;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:37
*/
public interface OrderService {
List<Order> listAll();
int add(Order order);
Order getOrderById(@Param("id") Long id);
}
java
package com.xue.service;
import com.xue.entity.Order;
import com.xue.mapper.OrderMapper;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:41
*/
@Service
@Transactional(propagation = Propagation.REQUIRES_NEW)
public class OrderServiceImpl implements OrderService{
@Resource
private OrderMapper orderMapper;
@Override
public List<Order> listAll() {
return orderMapper.listAll();
}
@Override
public int add(Order order) {
return orderMapper.add(order);
}
@Override
public Order getOrderById(Long id) {
return orderMapper.getOrderById(id);
}
}
生产者
java
package com.xue.controller;
import com.xue.entity.Order;
import com.xue.service.OrderService;
import com.xue.util.JsonUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:36
*/
@RestController
@Slf4j
@Api(description = "订单接口")
public class OrderController {
@Resource
private OrderService orderService;
@Resource
private RedisTemplate redisTemplate;
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/list")
@ApiOperation("返回全部订单")
public List<Order> list() {
return orderService.listAll();
}
@GetMapping("/get/{id}")
@ApiOperation("按照ID查询出订单")
public Order get(@PathVariable("id") Long id) {
return orderService.getOrderById(id);
}
@ApiOperation("新增订单同时插入mysql+redis+RabbitMQ")
@PostMapping(value = "/order/add2")
public Order add2(@RequestBody Order order) throws Exception {
//1 往数据库插入记录
int result = orderService.add(order);
if (result > 0) {
//2 往redis里插入记录
redisTemplate.opsForValue().set(order.getId() + "", order);
//3 往RabbitMQ里插入数据,以便快递员delivery库可以对下单进行配送
rabbitTemplate.convertAndSend("boot_normal_exchange", "normal.add",
JsonUtils.obj2String(order), message -> {
message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
message.getMessageProperties().setContentEncoding("utf-8");
message.getMessageProperties().setExpiration("600000");
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setMessageId(order.getOrderUUID());
return message;
});
}
log.info("result to mysql: {}", order);
log.info("result to redis: {}", redisTemplate.opsForValue().get(order.getId() + ""));
log.info("result to rabbitmq: {}", String.valueOf(order.getId()));
return order;
}
}
/**
* @GetMapping("/producer/{routeKey}") public String producerMsg(@PathVariable String routeKey)
* {
* String msg = UUID.randomUUID().toString()+"----10秒钟后过期";
* //rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,routeKey,msg);
* <p>
* MessageProperties messageProperties = new MessageProperties();
* messageProperties.setExpiration("10000");
* <p>
* Message message = new Message(msg.getBytes(),messageProperties);
* rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,routeKey,message);
* <p>
* <p>
* return "发送消息成功:"+" routeKey:"+routeKey+"\t msg: "+msg;
* }
**/
消费者
java
package com.xue.service;
import com.rabbitmq.client.Channel;
import com.xue.entity.Order;
import com.xue.mapper.OrderMapper;
import com.xue.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:16
*/
@Service
@Slf4j
public class OrderConsumerService {
@Resource
private OrderMapper orderMapper;
@RabbitListener(queues = "boot_normal_queue")
public void getOrderInfo(Message message, Channel channel) throws IOException {
String orderUUID = message.getMessageProperties().getMessageId();
if (orderUUID == null) {
throw new RuntimeException("----全局订单UUID为空,非法订单");
}
// 从RabbitMQ中获得客户下单信息,orderUUID作为下单唯一号让快递库获取并投递,
// 下单+快递共用同一个t_order表,以bizType区分
Order orderDeliveryEntity = JsonUtils.string2Obj(new String(message.getBody()), Order.class);
orderDeliveryEntity.setBizType("delivery");
//1.1 没有int age=10/0就是正常,ack从默认自动
//1.2 加上int age=10/0,消费端出错,会自动触发RabbitMQ的重试机制,默认是无限次重复
try {
log.info("-----从RabbitMQ中获得的订单信息: " + orderUUID + "\t" + new String(message.getBody()));
int result = orderMapper.add(orderDeliveryEntity);
if (result > 0) {
log.info("-------快递员开始派送订单:" + orderUUID);
}
} catch (Exception e) {
log.error("将出错的消息记录到mysql或者日志中,后续人工补偿" + "\t" + e.getMessage());
throw new RuntimeException("将出错消息记录到mysql或者日志中,后续人工补偿");
}
}
}
异常演示
消费者
java
package com.xue.service;
import com.rabbitmq.client.Channel;
import com.xue.entity.Order;
import com.xue.mapper.OrderMapper;
import com.xue.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:31
*/
@Service
@Slf4j
public class OrderConsumerService2 {
@Resource
private OrderMapper orderMapper;
@RabbitListener(queues = "boot_normal_queue")
public void getOrderInfo(Message message, Channel channel) throws IOException {
String orderUUID = message.getMessageProperties().getMessageId();
if (orderUUID == null) {
throw new RuntimeException("----全局订单UUID为空,非法订单");
}
// 从RabbitMQ中获得客户下单信息,orderUUID作为下单唯一号让快递库获取并投递,
// 下单+快递共用同一个t_order表,以bizType区分
Order orderDeliveryEntity = JsonUtils.string2Obj(new String(message.getBody()), Order.class);
orderDeliveryEntity.setBizType("delivery");
//1.1 没有int age=10/0就是正常,ack从默认自动
//1.2 加上int age=10/0,消费端出错,会自动触发RabbitMQ的重试机制,默认是无限次重复
try {
log.info("-----从RabbitMQ中获得的订单信息: " + orderUUID + "\t" + new String(message.getBody()));
int result = orderMapper.add(orderDeliveryEntity);
if (result > 0) {
log.info("-------快递员开始派送订单:" + orderUUID);
int age = 10 / 0;
}
} catch (Exception e) {
log.error("将出错的消息记录到mysql或者日志中,后续人工补偿" + "\t" + e.getMessage());
throw new RuntimeException("将出错消息记录到mysql或者日志中,后续人工补偿");
}
}
}
消费端如果消费消息出现异常,默认是无限次重复
这是不可以的,建议设置规则,比如重试5次后还是失败,记录下来后续人工干预
解决1
设置重试次数和重试间隔相关配置
properties
server.port=7777
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms
签收机制修改为手动
properties
#开启重试机制,重试5次,每次间隔5秒
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms
#
##设置签收机制为手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费者修改1
java
package com.xue.service;
import com.rabbitmq.client.Channel;
import com.xue.entity.Order;
import com.xue.mapper.OrderMapper;
import com.xue.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:33
*/
@Service
@Slf4j
public class OrderConsumerService3 {
@Resource
private OrderMapper orderMapper;
@RabbitListener(queues = "boot_normal_queue")
public void getOrderInfo(Message message, Channel channel) throws IOException {
String orderUUID = message.getMessageProperties().getMessageId();
if (orderUUID == null) {
throw new RuntimeException("----全局订单UUID为空,非法订单");
}
// 从RabbitMQ中获得客户下单信息,orderUUID作为下单唯一号让快递库获取并投递,
// 下单+快递共用同一个t_order表,以bizType区分
Order orderDeliveryEntity = JsonUtils.string2Obj(new String(message.getBody()), Order.class);
orderDeliveryEntity.setBizType("delivery");
//2 配置5次重复,每次间隔3秒 + ack手动签收
/**
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms
#设置签收机制为手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
*/
try {
log.info("-----从RabbitMQ中获得的订单信息: " + orderUUID + "\t" + new String(message.getBody()));
int result = orderMapper.add(orderDeliveryEntity);
if (result > 0) {
log.info("-------快递员开始派送订单:" + orderUUID);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
int age = 10 / 0;
}
} catch (Exception e) {
log.error("将出错的消息记录到mysql或者日志中,后续人工补偿" + "\t" + e.getMessage());
throw new RuntimeException("将出错消息记录到mysql或者日志中,后续人工补偿");
}
}
}
测试通过
不再无限次重复,按照设置重试5次,每次间隔3秒钟
问题2
不再无限次重试了,设置为5次,依旧插入mysql多条记录,幂等性问题需要解决
9、消息幂等性问题,如何保证消息不被重复消费-boot
什么是幂等性
什么是幂等性
幂等是一个数学与计算机学概念,在数学中某一元运算为幂等时,其作用在任一元素两次后会和其作用一次的结果相同。
在计算机中编程中,一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
幂等函数或幂等方法是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。在HTTP/1.1中,对幂等性进行了定义。它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外),即第一次请求的时候对资源产生了作用,但是以后的多次请求都不会再对资源产生作用。
为什么需要实现幂等性
前端重复提交表单
在填写一些表格时候,用户填写完成提交,很多时候会因网络波动没有及时对用户做出提交成功响应,致使用户认为没有成功提交,然后一直点提交按钮,这时就会发生重复提交表单请求。
恶意用户进行刷单
例如在实现用户投票这种功能时,如果用户针对一个用户进行重复提交投票,这样会导致接口接收到用户重复提交的投票信息,这样会使投票结果与事实严重不符。
接口超时重复提交
很多时候 HTTP 客户端工具都默认开启超时重试的机制,尤其是第三方调用接口时候,为了防止网络波动超时等造成的请求失败,都会添加重试机制,导致一个请求提交多次。
消息进行重复消费
当使用 MQ 消息中间件时候,如果发生消息中间件出现错误未及时提交消费信息,导致发生重复消费。
如何实现幂等性
数据库唯一主键
需要生成全局唯一主键 ID
数据库唯一主键的实现主要是利用数据库中主键唯一约束的特性,一般来说唯一主键比较适用于“插入”时的幂等性,其能保证一张表中只能存在一条带该唯一主键的记录。
使用数据库唯一主键完成幂等性时需要注意的是,该主键一般来说并不是使用数据库中自增主键,而是使用分布式 ID 充当主键,这样才能能保证在分布式环境下 ID 的全局唯一性。
防重 Token 令牌
方案描述:
针对客户端连续点击或者调用方的超时重试等情况,例如提交订单,此种操作就可以用 Token 的机制实现防止重复提交。
简单的说就是调用方在调用接口的时候先向后端请求一个全局 ID(Token),请求的时候携带这个全局 ID 一起请求(Token 最好将其放到 Headers 中),后端需要对这个 Token 作为 Key,用户信息作为 Value 到 Redis 中进行键值内容校验,如果 Key 存在且 Value 匹配就执行删除命令,然后正常执行后面的业务逻辑。如果不存在对应的 Key 或 Value 不匹配就返回重复执行的错误信息,这样来保证幂等操作。
使用全局MessageID判断让消费者都消费同一个
方案描述:
所谓请求序列号,其实就是每次向服务端请求时候附带一个短时间内唯一不重复的序列号,该序列号可以是一个有序 ID,也可以是一个订单号,一般由下游生成,在调用上游服务端接口时附加该序列号和用于认证的 ID。当上游服务器收到请求信息后拿取该 序列号 和下游 认证ID 进行组合,形成用于操作 Redis 的 Key,然后到 Redis 中查询是否存在对应的 Key 的键值对,根据其结果:
如果存在,就说明已经对该下游的该序列号的请求进行了业务处理,这时可以直接响应重复请求的错误信息。
如果不存在,就以该 Key 作为 Redis 的键,以下游关键信息作为存储的值(例如下游商传递的一些业务逻辑信息),将该键值对存储到 Redis 中 ,然后再正常执行对应的业务逻辑即可。
消费者修改2-解决幂等性
java
package com.xue.service;
import com.rabbitmq.client.Channel;
import com.xue.entity.Order;
import com.xue.mapper.OrderMapper;
import com.xue.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
/**
* @Author: xueqimiao
* @Date: 2022/5/9 14:35
*/
@Service
@Slf4j
public class OrderConsumerService4 {
@Resource
private OrderMapper orderMapper;
@RabbitListener(queues = "boot_normal_queue")
public void getOrderInfo(Message message, Channel channel) throws IOException {
String orderUUID = message.getMessageProperties().getMessageId();
if (orderUUID == null) {
throw new RuntimeException("----全局订单UUID为空,非法订单");
}
/* 通过全局订单UUID进行判断,避免幂等性重复消费问题,多次插入数据库。
* 一般情况下面够用,考虑高并发,作业:请修改为通过reids进行存取*/
Order entity = orderMapper.getOrderByOrderUUID(orderUUID);
if (entity != null) {
log.info("快递订单delivery已经有该记录,不重复插");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return;
}
// 从RabbitMQ中获得客户下单信息,orderUUID作为下单唯一号让快递库获取并投递,
// 下单+快递共用同一个t_order表,以bizType区分
Order orderDeliveryEntity = JsonUtils.string2Obj(new String(message.getBody()), Order.class);
orderDeliveryEntity.setBizType("delivery");
//3 配置5次重复,每次间隔3秒 + ack手动签收+通过全局ID进行查询,避免幂等性问题。
try {
log.info("-----从RabbitMQ中获得的订单信息: " + orderUUID + "\t" + new String(message.getBody()));
int result = orderMapper.add(orderDeliveryEntity);
if (result > 0) {
log.info("-------快递员开始派送订单:" + orderUUID);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
int age = 10 / 0;
}
} catch (Exception e) {
log.error("将出错的消息记录到mysql或者日志中,后续人工补偿" + "\t" + e.getMessage());
throw new RuntimeException("将出错消息记录到mysql或者日志中,后续人工补偿");
}
}
}