Skip to content

什么是持久化消息

保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

在消息生产者将消息成功发送给 MQ 消息中间件之后。无论是出现任何问题,如:MQ 服务器宕机、消费者掉线等。都保证(Topic 要之前注册过,Queue 不用)消息消费者,能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。

Queue 消息非持久化和持久化

Queue 非持久,当服务器宕机,消息不存在(消息丢失了)。即便是非持久,消费者在不在线的话,消息也不会丢失,等待消费者在线,还是能够收到消息的。

Queue持久化,当服务器宕机,消息依然存在。Queue 消息默认是持久化的。

持久化消息,保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。

可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

image-20220730150634078

非持久化 生产者演示:具体看 13-14 行代码

当生产者成功发布消息之后,MQ 服务端宕机重启,消息生产者就收不到该消息了

java
public class JmsProduce {
    //  linux 上部署的activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String QUEUE_NAME = "queue001";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        // 非持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg --- " + i);
            producer.send(textMessage);
        }
        // 9.关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到 MQ 完成");
    }
}

持久化 生产者演示:具体看 13-14 行代码

当生产者成功发布消息之后,MQ 服务端宕机重启,消息生产者仍然能够收到该消息(默认)

java
public class JmsProduce {
    //  linux 上部署的 activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String QUEUE_NAME = "queue001";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        // 非持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg --- " + i);
            producer.send(textMessage);
        }
        // 关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到 MQ 完成");
    }
}

Topic 消息持久化

Topic 默认是 非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。

Topic 消息持久化,只要消费者向 MQ 服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是 MQ 服务器宕机还是消费者不在线。

注意

  • 一定要先运行一次消费者,等于向 MQ 注册,类似我订阅了这个主题
  • 然后再运行生产者发送消息。之后无论消费者是否在线,都会收到消息。如果不在线的话,下次连接的时候,会把没有收过的消息都接收过来

生产者演示:具体看 13 - 16 行代码(把 start 换到持久化后面,代表持久化后再启动 ActiveMQ)

java
public class JmsProduceTopic {
    //  linux 上部署的 activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String TOPIC_NAME = "topic001";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer producer = session.createProducer(topic);

        // 设置持久化 Topic 
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 设置持久化 Topic 之后再启动连接
        connection.start();

        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg --- " + i);
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到 MQ 完成");
    }
}

消费者演示:具体看 11、13 - 17 行代码(先订阅 Topic 后再 start)

java
public class JmsConsumerTopic {

    //  linux 上部署的 activemq 的 IP 地址 + activemq 的端口号(默认61616)
    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String TOPIC_NAME = "topic001";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        // 设置客户端 ID。向 MQ 服务器注册自己的名称
        connection.setClientID("marry");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        // 创建一个 Topic 订阅者对象。一参是 Topic,二参是订阅者名称
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "mark...");
        // 之后再开启连接
        connection.start();

        topicSubscriber.setMessageListener(message -> {
            if(null != message  && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
        topicSubscriber.close();
        session.close();
        connection.close();
    }
}

看 ActiveMQ 客户端,Topic 页面还是和之前的一样。另外在 subscribers 页面也会显示。如下:

image-20220730151008557

JMS 的发布订阅总结

JMS 的发布订阅总结

JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作 Topic。

主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。

主题使得消息订阅者和消息发布者保持互相独立,不需要解除即可保证消息的传送。

非持久订阅

非持久订阅只有当客户端处于激活状态,也就是和 MQ 保持连接状态才能收发到某个主题的消息。

如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。

一句话:先订阅注册才能接受到发布,只给订阅者发布消息。

持久订阅

客户端首先向 MQ 注册一个自己的身份 ID 识别号,当这个客户端处于离线时,生产者会为这个 ID 保存所有发送到主题的消息,当客户再次连接到 MQ 的时候,会根据消费者的 ID 得到所有当自己处于离线时发送到主题的消息

非持久订阅状态下,不能恢复或重新派送一个未签收的消息。

持久订阅才能恢复或重新派送一个未签收的消息。

非持久和持久化订阅如何选择

当所有的消息必须被接收,则用持久化订阅。当消息丢失能够被容忍,则用非持久订阅。