Skip to content

异步投递

ActiveMQ 支持同步,异步两种发送的模式将消息发送到 Broker,模式的选择对发送延时有巨大的影响。Producer 能达到怎么样的产出率(产出率 = 发送数据总量 / 时间)主要受发送延时的影响,使用异步发送可以显著提高发送的性能。

ActiveMQ 默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。

如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞 Producer 知道 Broker 返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。

很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。

异步发送

它可以最大化 Producer 端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升 Producer 性能;不过这也带来了额外的问题,是需要消耗更多的 Client 端内存同时也会导致 Broker 端性能消耗增加;此外它不能有效的确保消息的发送成功。在 userAsyncSend = true 的情况下客户端需要容忍消息丢失的可能。

总结:同步代表 send 后等待消息直到发到队列后才允许后面的消息发送。异步代表 send 后不管消息有没有发到队列,直接让后面的消息也发送。

代码

官网上 3 中代码实现:

img

具体看 3 行代码和 7 行代码 和 11 行代码

java
public class JmsProduce {
    //  方式1
    public static final String ACTIVE_URL = "nio://192.168.199.27:61616?jms.useAsyncSend=true";
    public static final String QUEUE_NAME = "queue001";
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        // 方式2
        activeMQConnectionFactory.setUseAsyncSend(true);
        Connection connection = activeMQConnectionFactory.createConnection();
        // 方式3
        ((ActiveMQConnection) connection).setUseAsyncSend(true);
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        // 持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg --- " + i);
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到 MQ 完成");
    }
}

消息如何确定发送成功

异步发送丢失消息的场景是:生产者设置 UseAsyncSend=true,使用 producer.send(msg) 持续发送消息。由于消息不阻塞,生产者会认为所有 send 的消息均被成功发送至 MQ。

如果 MQ 突然宕机,此时生产者端内存中尚未被发送至 MQ 的消息都会丢失。

所以,正确的异步发送方法是需要接收回调的。

同步发送和异步发送的区别就在此,同步发送等 send 不阻塞了就表示一定发送成功了,异步发送需要接收回执并由客户端再判断一次是否发送成功。

代码:具体看 13 行代码,18-30 行代码

java
public class JmsProduce {
    public static final String ACTIVE_URL = "nio://192.168.199.27:61616";
    public static final String QUEUE_NAME = "queue001";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        ((ActiveMQConnection) connection).setUseAsyncSend(true);
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        // MessageProducer producer = session.createProducer(queue);
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
        // 持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg --- " + i);
            textMessage.setJMSMessageID(UUID.randomUUID().toString());
            String msgId = textMessage.getJMSCorrelationID();
            producer.send(textMessage, new AsyncCallback() {
                @Override
                public void onSuccess() {
                    System.out.println("成功发送消息Id:" + msgId);
                }
                @Override
                public void onException(JMSException exception) {
                    System.out.println("失败发送消息Id:" + msgId);
                }
            });
        }
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到 MQ 完成");
    }
}

控制台观察发送消息的信息:

img

img

延迟投递和定时投递

介绍

官网文档:http://activemq.apache.org/delay-and-schedule-message-delivery.html

img

Property_nametypedescription
AMQ_SCHEDULED_DELAYlong延迟投递的时间
AMQ_SCHEDULED_PERIODlong重复投递的时间
AMQ_SCHEDULED_REPEATint重复投递的次数
AMQ_SCHEDULED_CORNStringCorn 表达式

使用:修改 activemq.xml 配置文件

xml
</bean>
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"  schedulerSupport="true" >
<destinationPolicy>

image-20220730212605636

代码实现

生产者具体看 15-27 行代码,生产消息后,规定消息的投递属性

java
public class JmsProduce {
    public static final String ACTIVE_URL = "nio://192.168.199.27:61616";
    public static final String QUEUE_NAME = "queue001";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        ((ActiveMQConnection) connection).setUseAsyncSend(true);
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        // 持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        long delay =  10*1000;
        long period = 5*1000;
        int repeat = 3 ;
        try {
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                // 延迟的时间
                textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
                // 重复投递的时间间隔
                textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
                // 重复投递的次数
                textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
                // 此处的意思:该条消息,等待 10 秒,之后每 5 秒发送一次,重复发送 3 次。
                producer.send(textMessage);
            }
            session.commit();
            System.out.println("消息发送到 MQ 完成");
        } catch (Exception e) {
            session.rollback();
            e.printStackTrace();
        } finally {
            producer.close();
            session.close();
            connection.close();
        }
    }
}

消费者代码:

java
public class Jms_TX_Consumer {

    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";

    private static final String ACTIVEMQ_QUEUE_NAME = "Schedule01";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {

            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                        textMessage.acknowledge();
                    } catch (Exception e) {
                        System.out.println("出现异常,消费失败,放弃消费");
                    }
                }
            }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

消息消费的重试机制

介绍

官网文档:http://activemq.apache.org/redelivery-policy

消费者收到消息,之后出现异常了,没有告诉 Broker 确认收到该消息,Broker 会尝试再将该消息发送给消费者。尝试 n 次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后 Broker 不会再将该消息发送给消费者。

具体哪些情况会引发消息重发

  • Client 用了 transactions 且再 session 中调用了 rollback
  • Client 用了 transactions 且再调用 commit 之前关闭或者没有 commit
  • Client 再 CLIENT_ACKNOWLEDGE 的传递模式下,session 中调用了 recover

重发时间间隔和重发次数

  • 间隔:1
  • 次数:6
  • 每秒发 6 次

有毒消息 Poison ACK

一个消息被 redelivedred 超过默认的最大重发次数(默认 6 次)时,消费的回个 MQ 发一个 poison ack 表示这个消息有毒,告诉 Broker 不要再发了。这个时候 Broker 会把这个消息放到 DLQ(私信队列)。

属性说明

  • collisionAvoidanceFactor:设置防止冲突范围的正负百分比,只有启用 useCollisionAvoidance 参数时才生效。也就是在延迟时间上再加一个时间波动范围。默认值为 0.15
  • maximumRedeliveries:最大重传次数,达到最大重连次数后抛出异常。为 -1 时不限制次数,为 0 时表示不进行重传。默认值为 6
  • maximumRedeliveryDelay:最大传送延迟,只在 useExponentialBackOff 为 true 时有效(V5.5),假设首次重连间隔为 10ms,倍数为 2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为 40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为 -1。
  • initialRedeliveryDelay:初始重发延迟时间,默认 1000L
  • redeliveryDelay:重发延迟时间,当 initialRedeliveryDelay = 0 时生效,默认 1000L
  • useCollisionAvoidance:启用防止冲突功能,默认 false
  • useExponentialBackOff:启用指数倍数递增的方式增加延迟时间,默认 false
  • backOffMultiplier:重连时间间隔递增倍数,只有值大于 1 和启用 useExponentialBackOff 参数时才生效。默认是 5

演示

java
public class Jms_TX_Consumer {
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    private static final String ACTIVEMQ_QUEUE_NAME = "dead01";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                        //session.commit();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        });
        //关闭资源
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

当事务开启未提交,签收手动开启未签收时,产生重复消费问题,当消息被重复消费 6 次以后,放入死信队列,无法被重复消费,准确说 7 次,因为第 1 次不算,则 6 次。

ActiveMQ 管理后台。多了一个名为 ActiveMQ.DLQ 队列,里面多了 3 条消息。

image-20220730213421105

基于上面代码,将修改重试次数为 3。更多的设置请参考官网文档。

java
public class Jms_TX_Consumer {
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    private static final String ACTIVEMQ_QUEUE_NAME = "dead01";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 修改默认参数,设置消息消费重试3次
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(3);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                        // session.commit();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

整合 Spring

xml
<!-- 定义 ReDelivery(重发机制)机制 -->
<bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <!-- 是否在每次尝试重新发送失败后,增长这个等待时间 -->
    <property name="useExponentialBackOff" value="true"></property>
    <!-- 重发次数,默认为 6 次这里没置为 3 次 -->
    <property name="maximumRedeliveries" value="3"></property><!-- 重发时间间隔,默认为 l 秒 -->
    <property name="initialRedeliveryDelay" value="1000"></property>
    <!-- 第一次失败后重新发送之前等待 500 亳秒,第二次失败再等待 500 * 2 毫秒,这里的 2 就是 vaLue --><property name="backOfFMultiplier" va1ue="2"></property>
    <!-- 最大传送延迟,只在 useExponentialBackOff 为 true 时有效(V5.5),假没首次重连间隔为 10ms 倍数为 2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为 40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔 -->
    <property name="maximumRedeliveryDelay" value="1000"></property>
</bean>
<!-- 创建连接工厂 -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"></property>
    <property name="redeliveryPolicy" ref=" activeNQRedeliveryPolicy"/> <!-- 引用重发机制- -->
</bean>

死信队列

官网文档:http://activemq.apache.org/redelivery-policy

介绍

死信队列:异常消息规避处理的集合,主要处理失败的消息。

ActiveMQ 中引入了 死信队列(Dead Letter Queue)的概念。即一条消息再被重发了多次后(默认为重发 6 次:redeliveryCounter == 6),将会被 ActiveMQ 移入「死信队列」。开发人员可以在这个 Queue 中查看处理出错的消息,进行人工干预。

image-20220730214132457

一般生产环境中在使用 MQ 的时候设计两个队列:一个是核心业务队列,一个是死信队列。

image-20220730214211293

核心业务队列,就是比如上图专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况的。

假如第三方物流系统故障了此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送都会遇到对方的接口报错。此时仓储系统就可以把这条消息拒绝访问或者标志位处理失败。一旦标志这条消息处理失败了之后,MQ 就会把这条消息转入提前设置好的一个死信队列中。然后你会看到的就是,在第三方物流系统故障期间,所有订单消息全部处理失败,全部会转入死信队列。然后你的仓储系统得专门有一个后台线程,监控第三方物流系统是否正常,能否请求的,不停的监视。一旦发现对方恢复正常,这个后台线程绒从死信队列消费出来处理失败的订单,重新执行发货和配送的通知逻辑。

配置(一般采用默认)

sharedDeadLetterStrategy

不管是 Queue 还是 Topic,失败的消息都放到这个队列中。下面修改 activemq.xml 的配置,可以达到修改队列的名字。

xml
<deadLetterStrategy>
    <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>

将所有的 DeadLetter 保存在一个共享的队列中,这是 ActiveMQ Broker 端默认的策略。共享队列默认为 ActiveMQ.DLQ,可以通过 deadLetterQueue 属性来设定。

individualDeadLetterStrategy

可以为 Queue 和 Topic 单独指定两个死信队列。还可以为某个 Topic 话题,单独指定一个死信队列。

把 DeadLetter 放入各自的死信通道中:

  • 对于 Queue 而言,死信通道的前缀默认为 ActiveMQ.DLQ.Queue.
  • 对于 Topic 而言,死信通道的前缀默认为 ActiveMQ.DLQ.Topic.

比如队列 Order,那么它对应的死信通道为 ActiveMQ.DLQ.Queue.Order。我们使用 queuePrefixtopicPrefix 来指定上述前缀。

默认情况下,无论是 Topic 还是 Queue,Broker 将使用 Queue 来保存 DeadLeader,即死信通道通常为 Queue,不过开发者也可以指定为 Topic。

xml
<policyEntry queue="order">
    <deadLetterStrategy>
        <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="false" />
    </deadLetterStrategy>
</policyEntry>

将队列 Order 中出现的 DeadLetter 保存在 DLQ.Order 中,不过此时 DLQ.Order 为 Topic。

属性 useQueueForTopicMessages 表示是否将 Topic 的 DeadLetter 保存在 Queue 中,默认为 true。

image-20220730214814192

自动删除过期消息

过期消息是值生产者指定的过期时间,超过这个时间的消息。

有时需要直接删除过期的消息而不需要发送到死队列中,processExpired 表示是否将过期消息放入死信队列,默认为 true。

xml
<policyEntry queue= ">">
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired= "false" />
    </deadLetterStrategy>
</policyEntry>

存放非持久化消息

默认情况下斤,ActiveMQ 不会把非持久的死消息发送到死信队列中。

processNonPersistent 表示是否将 非持久化 消息放入死信队列,默认为 false。

如果你想把非持久的消息发送到死队列中,需要设置属性 processNonPersistent = "true"

xml
<policyEntry queue= ">">
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processNonPersistent= "true" />
    </deadLetterStrategy>
</policyEntry>

防止重复消费(幂等性)

幂等性就类似表单的重复提交,在这里表单等于 Message,表单的重复提交等于 Message 的重复消费。

网络延迟传输中,会造成进行 MQ 重试中,在重试过程中,可能会造成重复消费。

解决

如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

如果上面情况还不行,准备一个第三服务方来做消费记录。以 Redis 为例,给消息分配一个全局 ID,只要消费过该消息,将 <ID, Message> 以 K-V 形式写入 Redis。那消费者开始消费前,先去 Redis 中查询有没有消费记录即可,没有记录才能消费。

幂等性解决的核心,就是根据 MessageID 去查这个消息是否被消费了。