源本科技 | 码上会

分布式消息队列 RocketMQ 事务消息

2026/05/30
1
0

引言

事务消息是 RocketMQ 解决分布式事务最终一致性的核心特性,通过半消息、事务状态回查机制,保证本地事务执行与消息发送的原子性

关键概念

  • 半消息(Half Message)
    生产者发送的临时不可消费消息,Broker 已接收但未提交,仅用于标记消息预发送成功。

  • 事务状态回查
    若网络波动 / 生产者重启导致二次确认丢失,Broker 会主动回查生产者本地事务状态,保证事务最终一致性。

  • 三种事务状态

    • COMMIT:提交事务,消息可被消费者消费

    • ROLLBACK:回滚事务,消息被丢弃

    • UNKNOWN:未知状态,等待 Broker 回查

执行流程

  • 生产者发送半消息至 Broker,消息暂不可消费

  • 半消息发送成功后,执行本地事务(数据库操作等)

  • 根据本地事务结果,向 Broker 发送二次确认(COMMIT/ROLLBACK)

  • 若确认消息丢失,Broker 触发事务状态回查

  • 状态确认后,消息投递至消费者,完成事务流程

事务消息开发

事务消息生产者

package com.lusifer.sca.message.producer.service;

import com.lusifer.sca.message.producer.domain.MyObject;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class TransactionProducerService {

  private final RocketMQTemplate rocketMQTemplate;

  /**
   * 发送事务消息
   *
   * @param topic 消息主题
   * @param msgBody 消息内容
   */
  public void sendMessageInTransaction(String topic, String msgBody) {
    // 构建事务消息,绑定唯一事务ID
    Message<String> message =
        MessageBuilder.withPayload(msgBody)
            .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
            .build();

    // 发送事务消息(主题:标签,消息,业务参数)
    SendResult sendResult =
        rocketMQTemplate.sendMessageInTransaction(topic + ":tagA", message, new MyObject("张三", 18));

    log.info("事务半消息发送成功,发送结果:{}", sendResult);
  }
}

本地事务监听器

package com.lusifer.sca.message.producer.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

/**
 * 事务消息监听器
 * 核心:执行本地事务 + 事务状态回查
 */
@Slf4j
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    /**
     * 执行本地事务(半消息发送成功后自动调用)
     * @param message 事务消息
     * @param arg 业务参数
     * @return 事务状态
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        String payload = message.getPayload().toString();
        log.info("开始执行本地事务,消息内容:{},业务参数:{}", payload, arg);

        try {
            // 模拟本地事务:数据库增删改查
            boolean transactionSuccess = true;
            if (transactionSuccess) {
                log.info("本地事务执行成功,提交事务消息");
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                log.info("本地事务执行失败,回滚事务消息");
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            log.error("本地事务执行异常", e);
            // 异常返回未知状态,等待Broker回查
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    /**
     * 事务状态回查(Broker 主动回调)
     * @param message 事务消息
     * @return 事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String payload = message.getPayload().toString();
        log.info("Broker 触发事务回查,消息内容:{}", payload);

        // 模拟查询本地事务最终状态
        boolean transactionFinalSuccess = true;
        if (transactionFinalSuccess) {
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

事务消息消费者

在消费者模块创建监听,与普通消息消费逻辑一致:

package com.lusifer.sca.message.consumer.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-consumer-group")
public class TransactionConsumer implements RocketMQListener<String> {

  /** 消费事务消息(仅本地事务提交后才会触发) */
  @Override
  public void onMessage(String message) {
    log.info("成功消费事务消息:{}", message);
  }
}

测试用例

package com.lusifer.sca.message.producer.test;

import com.lusifer.sca.message.producer.service.TransactionProducerService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@Slf4j
@SpringBootTest
public class TransactionMessageTest {

  @Resource private TransactionProducerService transactionProducerService;

  @Test
  public void testSendTransactionMessage() {
    transactionProducerService.sendMessageInTransaction(
        "transaction-topic", "Hello RocketMQ Transaction Message!");
  }
}

注意事项

  • 事务 ID
    必须通过 RocketMQHeaders.TRANSACTION_ID 设置唯一事务 ID,用于状态回查。

  • 回查机制
    Broker 默认回查 15 次,超过次数后丢弃消息,可通过 Broker 配置调整。

  • 本地事务
    必须保证本地事务的幂等性与可查询性,回查时能准确获取事务最终状态。

  • 使用场景
    订单创建、支付通知、分布式数据同步等强一致性业务场景。