同步消息是 RocketMQ 中高可靠性的消息发送方式,发送时会阻塞当前线程,等待 Broker 服务器返回发送结果后再继续执行,极大降低消息丢失概率,适用于订单、支付等核心业务场景
创建序列化实体对象,用于测试复杂对象发送
package com.lusifer.sca.message.producer.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/** 测试消息实体 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MyObject {
private String name;
private int age;
}创建核心服务类,注入 RocketMQTemplate 封装同步消息发送方法
package com.lusifer.sca.message.producer.service;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
/** 同步消息发送服务 */
@Service
@RequiredArgsConstructor
public class SyncMessageService {
private final RocketMQTemplate rocketMQTemplate;
}package com.lusifer.sca.message.producer.test;
import com.lusifer.sca.message.producer.service.SyncMessageService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
@Slf4j
public class SyncMessageTest {
@Resource private SyncMessageService syncMessageService;
}最简同步消息发送方式,支持直接发送对象、Spring Message 对象,无需手动序列化。
/**
* 发送普通同步消息(对象)
* @param destination 目标(Topic)
* @param payload 消息对象
* @return 发送结果
*/
public SendResult syncSendObject(String destination, Object payload) {
return rocketMQTemplate.syncSend(destination, payload);
}测试代码
@Test
public void testSyncSendObject() {
MyObject myObject = new MyObject("张三", 30);
SendResult result = syncMessageService.syncSendObject("test-topic", myObject);
log.info("对象消息发送成功,结果:{}", result);
}/**
* 发送普通同步消息(Message 对象)
* @param destination 目标(Topic)
* @param message 消息体
* @return 发送结果
*/
public SendResult syncSendMessage(String destination, Message<?> message) {
return rocketMQTemplate.syncSend(destination, message);
}测试代码
@Test
public void testSyncSendMessage() {
Message<String> message = MessageBuilder.withPayload("Hello, RocketMQ!").build();
SendResult result = syncMessageService.syncSendMessage("test-topic", message);
log.info("文本消息发送成功,结果:{}", result);
}标签(Tag)用于 Topic 内消息细分,消费者可根据标签过滤消息,格式:Topic:Tag。
/**
* 发送带标签的同步消息
* @param topic 主题
* @param tag 消息标签
* @param msg 消息内容
* @return 发送结果
*/
public <T> SendResult syncSendWithTag(String topic, String tag, T msg) {
String destination = topic + ":" + tag;
return rocketMQTemplate.syncSend(destination, msg);
}测试代码
@Test
public void testSyncSendWithTag() {
MyObject myObject = new MyObject("张三", 30);
SendResult result = syncMessageService.syncSendWithTag("test-topic", "tagA", myObject);
log.info("带标签消息发送成功,结果:{}", result);
}一次性发送多条消息,提升吞吐量,批量消息总大小不能超过 4 MB。
/**
* 发送批量同步消息
*
* @param destination 目标(Topic)
* @param messages 消息集合
* @return 发送结果
*/
public <T> SendResult syncSendBatch(String destination, Collection<T> messages) {
List<Message<T>> messageList =
messages.stream().map(MessageBuilder::withPayload).map(MessageBuilder::build).toList();
return rocketMQTemplate.syncSend(destination, messageList);
}测试代码
@Test
public void testSyncSendBatch() {
java.util.List<MyObject> myObjects = java.util.Arrays.asList(
new MyObject("张三", 30),
new MyObject("李四", 25),
new MyObject("王五", 28)
);
SendResult result = syncMessageService.syncSendBatch("test-topic", myObjects);
log.info("批量消息发送成功,结果:{}", result);
}自定义消息发送超时时间(默认 3000 毫秒),适配网络波动场景
/**
* 发送带超时时间的同步消息
* @param destination 目标(Topic)
* @param payload 消息内容
* @param timeout 超时时间(单位:毫秒)
* @return 发送结果
*/
public SendResult syncSendWithTimeout(String destination, Object payload, long timeout) {
return rocketMQTemplate.syncSend(destination, payload, timeout);
}测试代码
@Test
public void testSyncSendWithTimeout() {
MyObject myObject = new MyObject("张三", 30);
// 超时时间设置为 5000 毫秒
SendResult result = syncMessageService.syncSendWithTimeout("test-topic", myObject, 5000);
log.info("自定义超时消息发送成功,结果:{}", result);
}指定消息延迟级别,消息到达 Broker 后不会立即投递,等待对应时间后才会被消费者接收,适用于定时任务、超时关闭订单等场景。
/**
* 发送延迟同步消息
*
* @param destination 目标(Topic)
* @param payload 消息内容
* @param timeout 超时时间(毫秒)
* @param delayLevel 延迟级别
* @return 发送结果
*/
public SendResult syncSendWithDelay(
String destination, Object payload, long timeout, int delayLevel) {
Message<Object> message = MessageBuilder.withPayload(payload).build();
return rocketMQTemplate.syncSend(destination, message, timeout, delayLevel);
}测试代码
@Test
public void testSyncSendWithDelay() {
String message = "Hello, Delayed RocketMQ!";
// 延迟级别 3 = 10 秒后投递
SendResult result = syncMessageService.syncSendWithDelay("test-topic", message, 5000, 3);
log.info("延迟消息发送成功,结果:{}", result);
}RocketMQ 内置 18 个固定延迟级别,不支持自定义时间:
同步消息特性:阻塞线程、等待 Broker ACK、可靠性最高,适合核心业务;
批量消息限制:单批次消息总大小不超过 4 MB,无顺序保证;
标签使用:消费者可通过 selectorExpression = "tagA" 精准过滤消息;
延迟消息:仅支持固定级别,无法自定义秒数,最大延迟 2 小时;
序列化:Spring Boot Starter 自动使用 JSON 序列化,实体类必须提供无参构造。