顺序消息用于保证消息生产与消费的严格时序。是 RocketMQ 核心高级特性
顺序消息指生产者发送顺序 = 消费者消费顺序,RocketMQ 通过 hashKey 将消息路由到同一个消息队列,实现严格顺序;消费者需配置顺序消费模式。
相同 hashKey 的消息会进入同一队列,保证全局有序
适用于订单创建→支付→发货→完成等强时序业务
吞吐量低于普通消息,牺牲并发换取顺序性
顺序消息生产者服务
package com.lusifer.sca.message.producer.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderMessageService {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送顺序消息
*
* @param topic 主题
* @param tag 标签
* @param hashKey 哈希键(相同key保证消息顺序)
* @param message 消息内容
*/
public void sendOrderlyMessage(String topic, String tag, String hashKey, String message) {
// 同步顺序发送:相同hashKey路由到同一队列
SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic + ":" + tag, message, hashKey);
log.info("顺序消息发送成功,内容:{},消息ID:{}", message, sendResult.getMsgId());
}
}测试基类
package com.lusifer.sca.message.producer.test;
import com.lusifer.sca.message.producer.service.OrderMessageService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
@Slf4j
public class OrderMessageTest {
@Resource private OrderMessageService orderMessageService;
}关键修复:hashKey 必须固定才能保证顺序,使用 UUID 会导致消息分散到不同队列,无法实现顺序!
@Test
public void testSendOrderlyMessage() {
// 固定hashKey:保证所有消息进入同一个队列
String hashKey = "ORDER_10086";
orderMessageService.sendOrderlyMessage("test-topic", "tagA", hashKey, "Step 1:订单创建");
orderMessageService.sendOrderlyMessage("test-topic", "tagA", hashKey, "Step 2:订单支付");
orderMessageService.sendOrderlyMessage("test-topic", "tagA", hashKey, "Step 3:订单发货");
}必须配置 consumeMode = ConsumeMode.ORDERLY 开启顺序消费,这是接收顺序消息的核心配置:
package com.lusifer.sca.message.consumer.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/** 消息消费者 topic:绑定消息主题 consumerGroup:消费者分组,读取配置文件参数 */
@Slf4j
@Service
@RocketMQMessageListener(
topic = "test-topic",
consumerGroup = "${rocketmq.consumer.group}",
// 开启顺序消费模式
consumeMode = ConsumeMode.ORDERLY)
public class MessageConsumer implements RocketMQListener<String> {
/**
* 接收并处理消息
*
* @param message 接收到的消息体
*/
@Override
public void onMessage(String message) {
System.out.println("消费者接收消息:" + message);
}
}hashKey 必须固定:相同业务标识(如订单号、用户 ID)才能保证消息进入同一队列
顺序消费会降低并发能力:一个队列同一时间仅允许一个消费者线程消费
不支持异步 / 单向顺序发送:仅支持 syncSendOrderly 同步顺序发送