源本科技 | 码上会

分布式消息队列 RocketMQ 顺序消息

2026/05/30
0
0

引言

顺序消息用于保证消息生产与消费的严格时序。是 RocketMQ 核心高级特性

顺序消息

顺序消息指生产者发送顺序 = 消费者消费顺序,RocketMQ 通过 hashKey 将消息路由到同一个消息队列,实现严格顺序;消费者需配置顺序消费模式。

核心特性

  • 相同 hashKey 的消息会进入同一队列,保证全局有序

  • 适用于订单创建→支付→发货→完成等强时序业务

  • 吞吐量低于普通消息,牺牲并发换取顺序性

准备工作

顺序消息生产者服务

package com.lusifer.sca.message.producer.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderMessageService {

  private final RocketMQTemplate rocketMQTemplate;

  /**
   * 发送顺序消息
   *
   * @param topic 主题
   * @param tag 标签
   * @param hashKey 哈希键(相同key保证消息顺序)
   * @param message 消息内容
   */
  public void sendOrderlyMessage(String topic, String tag, String hashKey, String message) {
    // 同步顺序发送:相同hashKey路由到同一队列
    SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic + ":" + tag, message, hashKey);
    log.info("顺序消息发送成功,内容:{},消息ID:{}", message, sendResult.getMsgId());
  }
}

测试基类

package com.lusifer.sca.message.producer.test;

import com.lusifer.sca.message.producer.service.OrderMessageService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
@Slf4j
public class OrderMessageTest {
  @Resource private OrderMessageService orderMessageService;
}

顺序消息测试

关键修复hashKey 必须固定才能保证顺序,使用 UUID 会导致消息分散到不同队列,无法实现顺序

@Test
public void testSendOrderlyMessage() {
    // 固定hashKey:保证所有消息进入同一个队列
    String hashKey = "ORDER_10086";
    
    orderMessageService.sendOrderlyMessage("test-topic", "tagA", hashKey, "Step 1:订单创建");
    orderMessageService.sendOrderlyMessage("test-topic", "tagA", hashKey, "Step 2:订单支付");
    orderMessageService.sendOrderlyMessage("test-topic", "tagA", hashKey, "Step 3:订单发货");
}

顺序消息消费者

必须配置 consumeMode = ConsumeMode.ORDERLY 开启顺序消费,这是接收顺序消息的核心配置:

package com.lusifer.sca.message.consumer.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/** 消息消费者 topic:绑定消息主题 consumerGroup:消费者分组,读取配置文件参数 */
@Slf4j
@Service
@RocketMQMessageListener(
    topic = "test-topic",
    consumerGroup = "${rocketmq.consumer.group}",
    // 开启顺序消费模式
    consumeMode = ConsumeMode.ORDERLY)
public class MessageConsumer implements RocketMQListener<String> {

  /**
   * 接收并处理消息
   *
   * @param message 接收到的消息体
   */
  @Override
  public void onMessage(String message) {
    System.out.println("消费者接收消息:" + message);
  }
}

注意事项

  • hashKey 必须固定:相同业务标识(如订单号、用户 ID)才能保证消息进入同一队列

  • 顺序消费会降低并发能力:一个队列同一时间仅允许一个消费者线程消费

  • 不支持异步 / 单向顺序发送:仅支持 syncSendOrderly 同步顺序发送