源本科技 | 码上会

分布式消息队列 RocketMQ 同步消息

2026/05/30
0
0

引言

同步消息是 RocketMQ 中高可靠性的消息发送方式,发送时会阻塞当前线程,等待 Broker 服务器返回发送结果后再继续执行,极大降低消息丢失概率,适用于订单、支付等核心业务场景

准备工作

消息实体

创建序列化实体对象,用于测试复杂对象发送

package com.lusifer.sca.message.producer.domain;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/** 测试消息实体 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MyObject {
  private String name;
  private int age;
}

同步消息

创建核心服务类,注入 RocketMQTemplate 封装同步消息发送方法

package com.lusifer.sca.message.producer.service;

import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;

/** 同步消息发送服务 */
@Service
@RequiredArgsConstructor
public class SyncMessageService {

  private final RocketMQTemplate rocketMQTemplate;
}

测试基类

package com.lusifer.sca.message.producer.test;

import com.lusifer.sca.message.producer.service.SyncMessageService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
@Slf4j
public class SyncMessageTest {
  @Resource private SyncMessageService syncMessageService;
}

普通同步消息

最简同步消息发送方式,支持直接发送对象、Spring Message 对象,无需手动序列化。

普通对象消息

/**
 * 发送普通同步消息(对象)
 * @param destination 目标(Topic)
 * @param payload 消息对象
 * @return 发送结果
 */
public SendResult syncSendObject(String destination, Object payload) {
    return rocketMQTemplate.syncSend(destination, payload);
}

测试代码

@Test
public void testSyncSendObject() {
    MyObject myObject = new MyObject("张三", 30);
    SendResult result = syncMessageService.syncSendObject("test-topic", myObject);
    log.info("对象消息发送成功,结果:{}", result);
}

Spring Message 对象

/**
 * 发送普通同步消息(Message 对象)
 * @param destination 目标(Topic)
 * @param message 消息体
 * @return 发送结果
 */
public SendResult syncSendMessage(String destination, Message<?> message) {
    return rocketMQTemplate.syncSend(destination, message);
}

测试代码

@Test
public void testSyncSendMessage() {
    Message<String> message = MessageBuilder.withPayload("Hello, RocketMQ!").build();
    SendResult result = syncMessageService.syncSendMessage("test-topic", message);
    log.info("文本消息发送成功,结果:{}", result);
}

带标签的同步消息

标签(Tag)用于 Topic 内消息细分,消费者可根据标签过滤消息,格式:Topic:Tag

/**
 * 发送带标签的同步消息
 * @param topic 主题
 * @param tag 消息标签
 * @param msg 消息内容
 * @return 发送结果
 */
public <T> SendResult syncSendWithTag(String topic, String tag, T msg) {
    String destination = topic + ":" + tag;
    return rocketMQTemplate.syncSend(destination, msg);
}

测试代码

@Test
public void testSyncSendWithTag() {
    MyObject myObject = new MyObject("张三", 30);
    SendResult result = syncMessageService.syncSendWithTag("test-topic", "tagA", myObject);
    log.info("带标签消息发送成功,结果:{}", result);
}

批量同步消息

一次性发送多条消息,提升吞吐量,批量消息总大小不能超过 4 MB

  /**
   * 发送批量同步消息
   *
   * @param destination 目标(Topic)
   * @param messages 消息集合
   * @return 发送结果
   */
  public <T> SendResult syncSendBatch(String destination, Collection<T> messages) {
    List<Message<T>> messageList =
        messages.stream().map(MessageBuilder::withPayload).map(MessageBuilder::build).toList();
    return rocketMQTemplate.syncSend(destination, messageList);
  }

测试代码

  @Test
  public void testSyncSendBatch() {
    java.util.List<MyObject> myObjects = java.util.Arrays.asList(
            new MyObject("张三", 30),
            new MyObject("李四", 25),
            new MyObject("王五", 28)
    );
    SendResult result = syncMessageService.syncSendBatch("test-topic", myObjects);
    log.info("批量消息发送成功,结果:{}", result);
  }

自定义超时同步消息

自定义消息发送超时时间(默认 3000 毫秒),适配网络波动场景

/**
 * 发送带超时时间的同步消息
 * @param destination 目标(Topic)
 * @param payload 消息内容
 * @param timeout 超时时间(单位:毫秒)
 * @return 发送结果
 */
public SendResult syncSendWithTimeout(String destination, Object payload, long timeout) {
    return rocketMQTemplate.syncSend(destination, payload, timeout);
}

测试代码

@Test
public void testSyncSendWithTimeout() {
    MyObject myObject = new MyObject("张三", 30);
    // 超时时间设置为 5000 毫秒
    SendResult result = syncMessageService.syncSendWithTimeout("test-topic", myObject, 5000);
    log.info("自定义超时消息发送成功,结果:{}", result);
}

延迟同步消息

指定消息延迟级别,消息到达 Broker 后不会立即投递,等待对应时间后才会被消费者接收,适用于定时任务、超时关闭订单等场景。

  /**
   * 发送延迟同步消息
   *
   * @param destination 目标(Topic)
   * @param payload 消息内容
   * @param timeout 超时时间(毫秒)
   * @param delayLevel 延迟级别
   * @return 发送结果
   */
  public SendResult syncSendWithDelay(
      String destination, Object payload, long timeout, int delayLevel) {
    Message<Object> message = MessageBuilder.withPayload(payload).build();
    return rocketMQTemplate.syncSend(destination, message, timeout, delayLevel);
  }

测试代码

@Test
public void testSyncSendWithDelay() {
    String message = "Hello, Delayed RocketMQ!";
    // 延迟级别 3 = 10 秒后投递
    SendResult result = syncMessageService.syncSendWithDelay("test-topic", message, 5000, 3);
    log.info("延迟消息发送成功,结果:{}", result);
}

延迟级别对照表

RocketMQ 内置 18 个固定延迟级别,不支持自定义时间:

延迟级别

延迟时间

延迟级别

延迟时间

1

1 秒

10

6 分钟

2

5 秒

11

7 分钟

3

10 秒

12

8 分钟

4

30 秒

13

9 分钟

5

1 分钟

14

10 分钟

6

2 分钟

15

20 分钟

7

3 分钟

16

30 分钟

8

4 分钟

17

1 小时

9

5 分钟

18

2 小时

注意事项

  1. 同步消息特性:阻塞线程、等待 Broker ACK、可靠性最高,适合核心业务;

  2. 批量消息限制:单批次消息总大小不超过 4 MB,无顺序保证;

  3. 标签使用:消费者可通过 selectorExpression = "tagA" 精准过滤消息;

  4. 延迟消息:仅支持固定级别,无法自定义秒数,最大延迟 2 小时;

  5. 序列化:Spring Boot Starter 自动使用 JSON 序列化,实体类必须提供无参构造。