Skip to content

在掌握 RocketMQ 的基本实现逻辑之后,接下来通过 Java 程序来学习 RocketMQ 的多种消息示例,RocketMQ 提供了发送多种发送消息的模式,例如同步消息,异步消息,顺序消息,延迟消息,事务消息等。它们拥有各自的应用场景。

消息发送和监听的流程

我们先搞清楚消息发送和监听的流程,然后我们在开始敲代码

消息生产者

  1. 创建消息生产者 Producer,并制定生产者组名
  2. 指定 Name Server 地址
  3. 启动 Producer
  4. 创建消息对象,指定主题 Topic、Tag 和消息体等
  5. 发送消息
  6. 关闭生产者 Producer

消息消费者

  1. 创建消费者 Consumer,制定消费者组名
  2. 指定 Name Server 地址
  3. 创建监听订阅主题 Topic 和 Tag 等
  4. 处理消息
  5. 启动消费者 Consumer

构建 Java 环境

在 Maven 项目中构建出 RocketMQ 消息示例的基础环境,即创建生产者程序和消费者程序。通过生产者和消费者了解 RocketMQ 操作消息的原生 API。

引入依赖,版本和服务器的 RocketMQ 版本保持一致:

xml
<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.4.0</version>
</dependency>

基础示例

生产者程序

java
public class BaseProducer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        // 1. 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1");
        // 2. 连接 NameServer
        producer.setNamesrvAddr("192.168.199.32:9876");
        // 3. 启动生产者
        producer.start();

        // 4. 生产消息
        for (int i = 0; i < 10; i++) {
            // 第一个参数:主题的名字
            // 第二个参数:消息内容
            Message message = new Message("myTopic1", "tagA", ("hello rocketmq " + i).getBytes(StandardCharsets.UTF_8));
            SendResult result = producer.send(message, 50000);
            System.out.println(result);
        }
        
        // 5. 关闭生产者
        producer.shutdown();
    }
}

消费者程序

java
public class BaseConsumer {
    public static void main(String[] args) throws MQClientException {
        // 1. 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");
        // 2. 连接 NamServer
        consumer.setNamesrvAddr("192.168.199.32:9876");
        // 3. 订阅主题
        consumer.subscribe("myTopic1", "*");

        consumer.registerMessageListener((MessageListenerOrderly) (messageExtList, consumeOrderlyContext) -> {
            messageExtList.forEach(msg -> {
                System.out.println("收到的消息:" + msg);
                // System.out.println("收到的消息:" + new String(msg.getBody()));
            });
            return ConsumeOrderlyStatus.SUCCESS;
        });

        // 5. 启动消费者
        consumer.start();
    }
}

流程如下图:

image-20231014014151247

简单消息示例

简单消息分成三种: 同步消息、异步消息、单向消息

同步消息

生产者发送消息后,必须等待 Broker 返回信息后才继续之后的业务逻辑,在 Broker 返回信息之前,生产者阻塞等待,而且在 MQ 集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式。

img

java
public class SyncProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {

        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("172.16.253.101:9876");
        // 启动实例
        producer.start();

        for (int i = 0; i < 100; i++) {
            // Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            // Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        // Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }

}

同步消息的应用场景: 如重要通知消息、短信通知、短信营销系统等。

异步消息

生产者发完消息后,不需要等待 Broker 的回信,可以直接执行之后的业务逻辑。生产者提供一个回调函数供 Broker 调用,体现了异步的方式。

java
public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("172.16.253.101:9876");
        // 启动实例
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);

        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("Jodie_topic_1023",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                
                // 第二个参数就是异步回调
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        System.out.println("=============");
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

异步传输一般用于响应时间敏感的业务场景。

单向消息

生产者发送完消息后不需要等待任何回复,直接进行之后的业务逻辑,单向传输用于需要中等可靠性的情况,例如日志收集。

java
public class OnewayProducer {

    public static void main(String[] args) throws Exception {
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        // 设置 NameServer 地址
        producer.setNamesrvAddr("172.16.253.101:9876");

        // 启动实例
        producer.start();

        for (int i = 0; i < 100; i++) {
            // Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        
        //Wait for sending to complete
        Thread.sleep(5000);
        producer.shutdown();
    }
}

顺序消息

顺序消息指的是消费者消费消息的顺序按照发送者发送消息的顺序执行。顺序消息分成两种: 局部顺序和全局顺序。

顺序消费的原理解析,在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的 Queue(分区队列);而消费消息的时候从多个 Queue 上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个 Queue 中,消费的时候只从这个 Queue 上依次拉取,则就保证了顺序。当发送和消费参与的 Queue 只有一个,则是全局有序;如果多个 Queue 参与,则为分区有序,即相对每个 Queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中,消费时,同一个顺序获取到的肯定是同一个队列。

局部顺序

局部消息指的是消费者消费某个 Topic 的某个队列中的消息是顺序的。消费者使用 MessageListenerOrderly 类做消息监听,实现局部顺序。

生产者

java
public class OrderProducer {

    public static void main(String[] args) throws Exception {

        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
        producer.setNamesrvAddr("192.168.199.32:9876");

        // 名字服务器的地址已经在环境变量中配置好了:NAMESRV_ADDR=172.16.253.101:9876
        // 启动实例
        producer.start();
        
        for (int i = 0; i < 10; i++) {
            int orderId = i;

            for (int j = 0; j <= 5; j++) {
                Message msg =
                        new Message("OrderTopicTest", "order_" + orderId, "KEY" + orderId,
                                ("order_" + orderId + " step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }
        }
        // server shutdown
        producer.shutdown();
    }
}

消费者

java
public class OrderConsumer {
    
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setNamesrvAddr("192.168.199.32:9876");
        
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("OrderTopicTest", "*");

        // MessageListenerOrderly 代表遵循队列的顺序来消费 
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    System.out.println("消息内容:" + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;

            }
        });
        
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

全局顺序

消费者消费全部消息都是顺序的,只能通过一个某个 Topic 只有一个队列才能实现这种应用场景较少,且性能较差。

乱序消费

消费者消费消息不需要关注消息的顺序。消费者使用MessageListenerConcurrently类做消息监听。

java
public class OrderConsumer {
    
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setNamesrvAddr("192.168.199.32:9876");
        
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("OrderTopicTest", "*");
        
        // MessageListenerConcurrently 代表没有遵守顺序消费,随机在队列里消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for(MessageExt msg:msgs){
                    System.out.println("消息内容:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

广播消息

播是向主题 (topic) 的所有订阅者发送消息。订阅同一个 topic 的多个消费者,能全量收到生产者发送的所有消息,

消费者

java
public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setNamesrvAddr("192.168.199.32:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 设置广播消息
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消息内容:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

生产者

java
public class BroadcastProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.199.32:9876");
        
        producer.start();

        for (int i = 0; i < 100; i++) {
            Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    ("Hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

延迟消息

延迟消息与普通消息的不同之处在于,它们要等到指定的时间之后才会被传递给消费者。

生产者

java
public class ScheduledProducer {
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        producer.setNamesrvAddr("192.168.199.32:9876");
        // 启动生产者
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // 设置延迟为 3 级
            message.setDelayTimeLevel(3);
            // 发送消息
            producer.send(message);
        }
        producer.shutdown();
    }
}

setDelayTimeLevel 为设置延迟等级,RocketMQ 设计了 18 个延迟等级,分别是

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

等级 3 对应的是 10s。系统为这 18 个等级配置了 18 个 Topic,用于实现延迟队列的效果。

也可以具体设置多少毫秒

java
public class ScheduledProducer {
    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        producer.setNamesrvAddr("192.168.199.32:9876");
        // 启动生产者
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // 设置延迟为 10000 ms
            message.setDelayTimeMs(10000);
            // 发送消息
            producer.send(message);
        }
        // 关闭生产者
        producer.shutdown();
    }
}

消费者

java
public class ScheduledConsumer {
    public static void main(String[] args) throws MQClientException {
        // Instantiate message consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
        consumer.setNamesrvAddr("192.168.199.32:9876");
        // Subscribe topics
        consumer.subscribe("TestTopic", "*");
        // Register message listener
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                            + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }
}

批量消息

批量发送消息提高了传递小消息的性能。

生产者

java
public class BatchProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.199.32:9876");
        
        producer.start();
        
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
        producer.send(messages);
        producer.shutdown();


    }
}

超出限制的批量消息

官方建议批量消息的总大小不应超过 1m,实际不应超过 4m。如果超过 4m 的批量消息需要进行分批处理,同时设置 Broker 的配置参数为 4m (在 Broker 的配置文件中修改: maxMessageSize=4194304)

分批处理生产者

java
public class MaxBatchProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.setNamesrvAddr("192.168.199.32:9876");
        producer.start();

        // large batch
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>(100 * 1000);
        for (int i = 0; i < 100 * 1000; i++) {
            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
        }
        // producer.send(messages);

        // split the large batch into small ones:
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            List<Message> listItem = splitter.next();
            producer.send(listItem);
        }
        producer.shutdown();
    }
}

ListSplitter

java
public class ListSplitter implements Iterator<List<Message>> {
    private int sizeLimit = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > sizeLimit) {
                //it is unexpected that single message exceeds the sizeLimit
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                    //if the next sublist has no element, add this one and then break, otherwise just break
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

使用限制:司一批次的消息应该具有: 相同的主题、相同的 waitStoreMsgOK 并且不支持延迟消息和事务消息。

过滤消息

RocketMQ 提供消息过滤功能,通过 Tag 或者 Key 进行区分。

我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有 tagA 标签的被 A 消费,带有 tagB 标签的被 B 消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才区别对待。

Tag

生产者

java
public class TagProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("192.168.199.32:9876");
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 15; i++) {
            Message msg = new Message("TagFilterTest",
                    tags[i % tags.length],
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

消费者,过滤出需要的 tag

java
public class TagConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("192.168.199.32:9876");

        consumer.subscribe("TagFilterTest", "TagA || TagC");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

消费者将收到包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是一条消息只能有一个标签,这可能不适用于复杂的场景。在这种情况下,您可以使用 SOL 表达式来过滤掉消息。

使用 SOL 过滤

SQL 功能可以通过您在发送消息时输入的属性进行一些计算。在 RocketMQ 定义的语法下,可以实现一些有趣的逻辑。这是一个例子:

java
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------

RocketMQ 只定义了⼀些基本的语法来⽀持这个特性,也可以轻松扩展它。

1. 数值⽐较,如`>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
2. 字符⽐较,如`=`, `<>`, `IN`;
3. `IS NULL`或`IS NOT NULL`;
4. 逻辑`AND`, `OR`, `NOT`;

常量类型有:

1. 数字,如 123、3.1415;
2. 字符,如'abc',必须⽤单引号;
3. `NULL`,特殊常数;
4. 布尔值,`TRUE`或`FALSE`;

使用注意: 只有推模式的消费者可以使用 SOL 过滤。拉模式是用不了的。

SQL 过滤生产者

java
public class SQLProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("192.168.199.32:9876");

        producer.start();

        String[] tags = new String[]{"TagA", "TagB", "TagC"};

        for (int i = 0; i < 15; i++) {
            Message msg = new Message("SqlFilterTest",
                    tags[i % tags.length],
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // Set some properties.
            msg.putUserProperty("a", String.valueOf(i));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

SQL 过滤消费者

java
public class SQLConsumer {
    public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("192.168.199.32:9876");

        // Don't forget to set enablePropertyFilter=true in broker
        consumer.subscribe("SqlFilterTest",
                MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                        "and (a is not null and a between 0 and 3)"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

Key

在 RocketMQ 中的消息,默认会有一个 messageId 当做消息的唯一标识,我们也可以给消息携带一个 key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用 messageId 或者 key 来进行查询。

带 key 消息生产者

java
@Test
public void testKeyProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("192.168.199.32:9876:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest","tagA","key", "我是一个带标记和key的消息".getBytes());
    SendResult send = producer.send(msg);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

带 key 消息消费者

java
@Test
public void testKeyConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("192.168.199.32:9876:9876");
    // 订阅一个主题来消费  表达式,默认是*,支持"tagA || tagB || tagC" 这样或者的写法 只要是符合任何一个标签都可以消费
    consumer.subscribe("TopicTest", "tagA || tagB || tagC");
    // 注册一个消费监听 MessageListenerConcurrently 是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
            System.out.println(msgs.get(0).getTags());
            System.out.println(msgs.get(0).getKeys());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

img

事务消息

事务消息可以被认为是一个两阶段的提交消息实现,以确保分布式系统的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子地执行。

事务消息的实现流程

image-20231014020012038

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

事务消息发送及提交

  • 发送消息(Half 消息)
  • 服务端响应消息写入结果
  • 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
  • 根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)

事务补偿

  • 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次「回查」
  • Producer 收到回查消息,检查回查消息对应的本地事务的状态
  • 根据本地事务状态,重新 Commit 或者 Rollback

其中,补偿阶段用于解决消息 UNKNOW 或者 Rollback 发生超时或者失败的情况

事务消息状态

事务消息有三种状态:

  • TransactionStatus.CommitTransaction: 提交事务,表示允许消费者消费该消息
  • TransactionStatus.RollbackTransaction: 回滚事务,表示该消息将被删除,不允许消费
  • TransactionStatus.Unknown: 中间状态,表示需要 MQ 回查才能确定状态

生产者

java
public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("172.16.253.101:9876");
      
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送事务消息
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

本地事务处理类 TransactionListenerImpl

java
public class TransactionListenerImpl implements TransactionListener {
    /**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tags = msg.getTags();
        if(StringUtils.contains(tags,"TagA")){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if(StringUtils.contains(tags,"TagB")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }

    /**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String tags = msg.getTags();
        if(StringUtils.contains(tags,"TagC")){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if(StringUtils.contains(tags,"TagD")){
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
}

消费者

java
public class TransactionConsumer {

    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");
        // 2.指明nameserver的地址
        consumer.setNamesrvAddr("172.16.253.101:9876");
        // 3.订阅主题:topic 和过滤消息用的tag表达式
        consumer.subscribe("TopicTest", "*");
        // 4.创建一个监听器,当broker把消息推过来时调用
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                for (MessageExt msg : msgs) {
                    // System.out.println("收到的消息:"+new String(msg.getBody()));
                    System.out.println("收到的消息:" + msg);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }
}

使用限制

  • 事务性消息没有调度和批处理支持
  • 为避免单条消息被检查次数过多,导致半队列消息堆积,我们默认将单条消息的检查次数限制为 15 次,但用户可以通过更改 transactionCheckMax 来更改此限制参数在 Broker 的配置中,如果一条消息的检查次数超过 transactionCheckMax 次,Broker 默认会丢弃这条消息,同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来改变这种行为。
  • 事务消息将在一定时间后检查,该时间由代理配置中的参数
  • transactionTimeout 确定。并且用户也可以在发送事务消息时通过设置用户属性 CHECK IMMUNITY_TIME_IN _SECONDS 来改变这个限制,这个参数优先于 transactionMsgTimeout 参数
  • 一个事务性消息可能会被检查或消费不止一次
  • 提交给用户目标主题的消息 reput 可能会失败。目前,它取决于日志记录高可用是由 RocketMQ 本身的高可用机制来保证的。如果要保证事务消息不丢失,保证事务完整性,推荐使用同步双写机制
  • 事务性消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务性消息允许向后查询。MQ 服务器通过其生产者 ID 查询客户端