Skip to content

消息的事务性

image-20220730151444865

决定事务的代码

java
Session session = connection.createSession(开启事务, 签收机制);

开启事务:true、false

签收机制:

  • Session.AUTO_ACKNOWLEDGE:自动签收
  • Session.CLIENT_ACKNOWLEDGE:手动签收
  • Session.SESSION_TRANSACTED:事务签收
  • Session.DUPS_OK_ACKNOWLEDGE:允许重复消费

生产者开启事务后,在 send 到消息队列时,必须执行 commit 方法,这批消息才真正的被提交。不执行 commit 方法,这批消息不会提交。执行 rollback 方法,之前的消息会回滚掉。生产者的事务机制,要高于签收机制,当生产者开启事务,签收机制不再重要。

消费者开启事务后,执行 commit 方法,这批消息才算真正的被消费。不执行 commit 方法,这些消息不会标记已消费,下次还会被消费,产生重复消费问题。执行 rollback 方法,是不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息,下次还会被消费。消费者利用 commit 和 rollback 方法,甚至能够违反一个消费者只能消费一次消息的原理。

总结:开启事务,必须在消息 send 后 commit,否则引起重复消费问题。

问:消费者和生产者需要同时操作事务才行吗?

答:消费者和生产者的事务,完全没有关联,各自是各自的事务。

事务性代码示例

生产者演示:(具体看)11 行代码和 21-26 行代码

java
public class JmsTXProduce {
    //  linux 上部署的 activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String QUEUE_NAME = "queue001";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 开启事务
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        // 非持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg --- " + i);
            producer.send(textMessage);
        }
       try {
            // 开启事务必须要 commit
            session.commit();
        } catch (JMSException e) {
            session.rollback();
            e.printStackTrace();
        }
        
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到 MQ 完成");
    }
}

消费者演示:具体看 12 行代码和 31-37 行代码

java
public class JmsTXConsumer {

    //  linux 上部署的activemq 的 IP 地址 + activemq 的端口号(默认61616)
    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String QUEUE_NAME = "queue001";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 开启事务
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message  && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费的消息:" + textMessage.getText());
                        textMessage.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();
        try {
            // 提交事务
            session.commit();
        } catch (JMSException e) {
            session.rollback();
            e.printStackTrace();
        }
        consumer.close();
        session.close();
        connection.close();
    }
}

消息的签收机制

决定签收代码

java
Session session = connection.createSession(开启事务, 签收机制);

开启事务:true、false

签收机制:

  • Session.AUTO_ACKNOWLEDGE:自动签收。该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息
  • Session.CLIENT_ACKNOWLEDGE:手动签收。该种方式,需要我们手动调用Message.acknowledge(),来签收消息。如果不签收消息,该消息会被我们反复消费,只到被签收
  • Session.SESSION_TRANSACTED:事务签收。多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到
  • Session.DUPS_OK_ACKNOWLEDGE:允许重复消费。开始事务的情况下,可以使用该方式。该种方式很少使用到

签收机制代码示例

生产者演示:(具体看)11 行代码和 21-26 行代码

java
public class JmsTXProduce {
    //  linux 上部署的 activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String QUEUE_NAME = "queue001";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 开启事务
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        // 非持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg --- " + i);
            producer.send(textMessage);
        }
       try {
            // 开启事务必须要 commit
            session.commit();
        } catch (JMSException e) {
            session.rollback();
            e.printStackTrace();
        }
        
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到 MQ 完成");
    }
}

消费者演示:手动签收,具体看 11 行代码和 22-26 行代码

java
public class JmsTXConsumer {

    public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
    public static final String QUEUE_NAME = "queue001";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 开启手动签收
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message  && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费的消息:" + textMessage.getText());
                        /* 
                            设置为 Session.CLIENT_ACKNOWLEDGE 后,要调用该方法,标志着该消息已被签收(消费)。
                            如果不调用该方法,该消息的标志还是未消费,下次启动消费者或其他消费者还会收到改消息。
                         */
                        textMessage.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();
        try {
            session.commit();
        } catch (JMSException e) {
            session.rollback();
            e.printStackTrace();
        }
        consumer.close();
        session.close();
        connection.close();
    }
}

事务和签收的关系

  1. 在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用
  2. 非事务性会话中,消息何时被确认取决于创建会话时的应答模式
  3. 生产者事务开启,只有commit后才能将全部消息变为已消费
  4. 事务偏向生产者,签收偏向消费者。也就是说,生产者使用事务更好点,消费者使用签收机制更好点

解决重复消费问题

  1. 开启事务后,消息 send 后需要 commit

    java
    producer.send(textMessage);
    session.commit();
  2. 开启手动签收,拿到消息后要调用 acknowledge

    java
    TextMessage textMessage = (TextMessage) message;
    System.out.println("拿到的消息:" + textMessage.getText());
    textMessage.acknowledge();

注意:默认重复消费次数到达 6 次后,则会将消息放入死信队列,即为病毒消息,不在消费该消息。具体看 ActiveMQ - 高级特性

事务和签收的区别

区别 1

  • 事务是消息发送(生产者)或者消息消费后(消费者)需要进行的确认操作,否则引起发送失败(生产者)或者重复消费(消费者)问题
  • 签收是消息生产后(生产者)或者消息消费后(消费者)需要进行的消息签收,否则引起制作失败或者重复消费(消费者)问题

区别 2

  • 事务针对的是消息的传递确认,即 Session
  • 签收针对的是消息的本身确认,即 Message

区别 3

  • 事务是对全部消息操作后,统一确认
  • 签收是对某一条消息操作后,一个一个确认

建议

  • 生产者使用事务,确保消息的传递后确认
  • 消费者使用签收,确保消息的消费后确认(手动签收)

JMS 的点对点总结

  1. 点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似

  2. 如果在 Session 关闭时有部分消息己被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,这些消息还会被再次接收

  3. 队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势

Broker是什么

相当于一个 ActiveMQ 服务器实例。说白了,Broker 其实就是实现了用代码的形式启动 ActiveMQ 将 MQ 嵌入到 Java 代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。这种方式,我们实际开发中很少采用,因为他缺少太多了东西,如:日志,数据存储等等。

启动broker时指定配置文件

Linux 里启动的 ActiveMQ 可以指定配置文件

sh
activemq start xbean:file:ActiveMQ安装目录/conf/其他的 activemq.xml 文件

启动 Broker 时指定配置文件,可以帮助我们在一台服务器上启动多个 Broker。实际工作中一般一台服务器只启动一个 Broker。

image-20220730153200787

嵌入式的broker启动

用 ActiveMQ Broker 作为独立的消息服务器来构建 Java 应用。

ActiveMQ 也支持在 vm 中通信基于嵌入的 Broker,能够无缝的集成其他 Java 应用。

pom.xml 添加一个依赖

xml
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.1</version>
</dependency>

启动类

java
public class EmbedBroker {

    public static void main(String[] args) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(true);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}

这样也可以发布消息和消费消息。