事务消息是 RocketMQ 解决分布式事务最终一致性的核心特性,通过半消息、事务状态回查机制,保证本地事务执行与消息发送的原子性
半消息(Half Message)
生产者发送的临时不可消费消息,Broker 已接收但未提交,仅用于标记消息预发送成功。
事务状态回查
若网络波动 / 生产者重启导致二次确认丢失,Broker 会主动回查生产者本地事务状态,保证事务最终一致性。
三种事务状态
COMMIT:提交事务,消息可被消费者消费
ROLLBACK:回滚事务,消息被丢弃
UNKNOWN:未知状态,等待 Broker 回查
生产者发送半消息至 Broker,消息暂不可消费
半消息发送成功后,执行本地事务(数据库操作等)
根据本地事务结果,向 Broker 发送二次确认(COMMIT/ROLLBACK)
若确认消息丢失,Broker 触发事务状态回查
状态确认后,消息投递至消费者,完成事务流程
package com.lusifer.sca.message.producer.service;
import com.lusifer.sca.message.producer.domain.MyObject;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class TransactionProducerService {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送事务消息
*
* @param topic 消息主题
* @param msgBody 消息内容
*/
public void sendMessageInTransaction(String topic, String msgBody) {
// 构建事务消息,绑定唯一事务ID
Message<String> message =
MessageBuilder.withPayload(msgBody)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.build();
// 发送事务消息(主题:标签,消息,业务参数)
SendResult sendResult =
rocketMQTemplate.sendMessageInTransaction(topic + ":tagA", message, new MyObject("张三", 18));
log.info("事务半消息发送成功,发送结果:{}", sendResult);
}
}package com.lusifer.sca.message.producer.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
/**
* 事务消息监听器
* 核心:执行本地事务 + 事务状态回查
*/
@Slf4j
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
/**
* 执行本地事务(半消息发送成功后自动调用)
* @param message 事务消息
* @param arg 业务参数
* @return 事务状态
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
String payload = message.getPayload().toString();
log.info("开始执行本地事务,消息内容:{},业务参数:{}", payload, arg);
try {
// 模拟本地事务:数据库增删改查
boolean transactionSuccess = true;
if (transactionSuccess) {
log.info("本地事务执行成功,提交事务消息");
return RocketMQLocalTransactionState.COMMIT;
} else {
log.info("本地事务执行失败,回滚事务消息");
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
log.error("本地事务执行异常", e);
// 异常返回未知状态,等待Broker回查
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 事务状态回查(Broker 主动回调)
* @param message 事务消息
* @return 事务状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String payload = message.getPayload().toString();
log.info("Broker 触发事务回查,消息内容:{}", payload);
// 模拟查询本地事务最终状态
boolean transactionFinalSuccess = true;
if (transactionFinalSuccess) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}在消费者模块创建监听,与普通消息消费逻辑一致:
package com.lusifer.sca.message.consumer.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-consumer-group")
public class TransactionConsumer implements RocketMQListener<String> {
/** 消费事务消息(仅本地事务提交后才会触发) */
@Override
public void onMessage(String message) {
log.info("成功消费事务消息:{}", message);
}
}package com.lusifer.sca.message.producer.test;
import com.lusifer.sca.message.producer.service.TransactionProducerService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@Slf4j
@SpringBootTest
public class TransactionMessageTest {
@Resource private TransactionProducerService transactionProducerService;
@Test
public void testSendTransactionMessage() {
transactionProducerService.sendMessageInTransaction(
"transaction-topic", "Hello RocketMQ Transaction Message!");
}
}事务 ID
必须通过 RocketMQHeaders.TRANSACTION_ID 设置唯一事务 ID,用于状态回查。
回查机制
Broker 默认回查 15 次,超过次数后丢弃消息,可通过 Broker 配置调整。
本地事务
必须保证本地事务的幂等性与可查询性,回查时能准确获取事务最终状态。
使用场景
订单创建、支付通知、分布式数据同步等强一致性业务场景。