源本科技 | 码上会

分布式事务 Seata XA 模式

2026/05/24
1
0

XA 模式交互原理

下图完整展示 Seata XA 模式事务流转逻辑,各流程环节、对应组件与执行动作对应关系如下表所示:

流程环节

涉及组件

核心动作

1.1 开启事务

事务管理器 TM

向事务协调者 TC 申请创建全局事务,TC 生成并返回唯一全局事务 ID(XID)

1.2 调用分支事务

事务管理器 TM

将全局事务 XID 透传至各个参与业务的微服务节点

1.3 注册分支事务

资源管理器 RM

分支服务向 TC 登记分支事务信息,获取专属分支事务标识

1.4 执行业务 SQL

资源管理器 RM

执行本地数据库增删改操作,事务执行后暂时阻塞,不主动提交数据

1.5 报告事务状态

资源管理器 RM

向 TC 上报本地分支事务执行成功或失败状态

2.1 提交 / 回滚事务

事务管理器 TM

根据整体业务执行结果,向 TC 发起全局事务提交或回滚指令

2.2 检查分支事务状态

事务协调者 TC

汇总所有分支事务状态,全成功则触发统一提交,存在失败则触发整体回滚

2.3 提交 / 回滚事务

资源管理器 RM

响应 TC 调度指令,完成本地事务最终提交或数据回滚操作

Seata XA 模式遵循 X/Open XA 行业规范,依托 两阶段提交 2PC 协议实现分布式事务管控,能够严格保证跨库、跨服务数据操作的原子一致性。

事务核心组件角色

  • 事务协调者 TC
    独立部署的 Seata 服务端节点,作为分布式事务调度中枢,统一管控全局事务生命周期,统筹所有分支事务提交与回滚决策。

  • 事务管理器 TM
    内嵌于业务微服务中,是全局事务的发起方,负责开启全局事务、判定事务最终结果,向 TC 下发事务终结指令。

  • 资源管理器 RM
    绑定各个业务数据库,对接支持 XA 协议的数据库驱动,负责执行本地分支事务、上报执行状态,响应 TC 事务调度命令。

两阶段执行机制

XA 模式将完整事务拆分为两个独立阶段执行,规避分布式场景下数据不一致问题:

准备阶段

  1. TM 向所有参与事务的 RM 发送预执行请求

  2. RM 执行本地业务 SQL,锁定数据资源,事务暂不提交

  3. RM 将自身执行状态反馈至 TC

提交阶段

  1. TC 汇总全部分支状态,全部预执行成功则进入提交流程

  2. TC 向所有 RM 下发提交指令,各节点正式持久化数据

  3. 任意分支预执行失败,立即下发回滚指令,所有节点撤销本次数据变更

模式特性与适用边界

  • 数据一致性:严格遵循 2PC 协议,实现强一致性数据管控,满足高可靠业务要求

  • 运行性能:多轮网络交互叠加数据库锁占用,整体性能偏弱,吞吐量有限

  • 环境要求:底层数据库必须原生支持 XA 协议,主流 MySQL 8.0、Oracle 均可适配

  • 适用场景:银行转账、资金结算、订单扣款等对数据一致性优先级极高的业务场景

库存服务代码

库存服务作为事务分支服务,提供商品库存扣减能力,接入 Seata XA 事务模式,独立对接 3308 端口库存数据库。

领域模型

按照 DDD 领域分层思想编写实体类,绑定数据库库存表结构

package com.lusifer.sca.service.stock.domain;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("t_stock")
public class Stock {
    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;
    private Integer productId;
    private Integer count;
}

数据访问层

继承 MyBatis-Plus 基础接口,快速实现数据库基础操作

package com.lusifer.sca.service.stock.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lusifer.sca.service.stock.domain.Stock;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface StockMapper extends BaseMapper<Stock> {

}

业务接口定义

封装库存查询、库存扣减核心业务方法,定义异常抛出规则

package com.lusifer.sca.service.stock.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.lusifer.sca.service.stock.domain.Stock;

public interface StockService extends IService<Stock> {
    /**
     * 根据商品 ID 查询库存信息
     * @param productId 商品 ID
     * @return 库存实体数据
     */
    Stock findByProductId(Integer productId);

    /**
     * 扣减对应商品库存数量
     * @param productId 商品 ID
     * @param count 扣减数量
     * @throws Exception 库存异常、商品不存在异常
     */
    void reduceStock(Integer productId, Integer count) throws Exception;
}

业务接口实现

编写业务逻辑,增加数据合法性校验,适配 Seata 分布式事务注解

package com.lusifer.sca.service.stock.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.lusifer.sca.service.stock.domain.Stock;
import com.lusifer.sca.service.stock.mapper.StockMapper;
import com.lusifer.sca.service.stock.service.StockService;
import org.apache.seata.spring.annotation.GlobalTransactional;
import org.springframework.stereotype.Service;

@Service
public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements StockService {

  @Override
  public Stock findByProductId(Integer productId) {
    LambdaQueryWrapper<Stock> queryWrapper = new LambdaQueryWrapper<>();
    queryWrapper.eq(Stock::getProductId, productId);
    return getOne(queryWrapper);
  }

  @Override
  @GlobalTransactional(rollbackFor = Exception.class)
  public void reduceStock(Integer productId, Integer count) throws Exception {
    Stock stock = findByProductId(productId);
    if (stock == null) {
      throw new RuntimeException("对应商品数据不存在");
    }
    if (stock.getCount() < count) {
      throw new RuntimeException("商品库存数量不足");
    }
    stock.setCount(stock.getCount() - count);
    updateById(stock);
  }
}

接口控制器

提供 HTTP 对外访问接口,捕获业务异常并返回统一响应结果

package com.lusifer.sca.service.stock.controller;

import com.lusifer.sca.service.stock.service.StockService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class StockController {
    private final StockService stockService;

    @GetMapping("/reduce/stock/{productId}/{count}")
    public ResponseEntity<String> reduceStock(@PathVariable Integer productId, @PathVariable Integer count) {
        try {
            stockService.reduceStock(productId, count);
            return ResponseEntity.ok("success");
        } catch (Exception e) {
            return new ResponseEntity<>("error", HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }
}

服务启动类

开启服务注册、Mapper 扫描,适配 Spring Boot 3 启动规范

package com.lusifer.sca.service.stock;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@EnableDiscoveryClient
@SpringBootApplication
@MapperScan("com.lusifer.sca.service.stock.mapper")
public class StockApplication {
    public static void main(String[] args) {
        SpringApplication.run(StockApplication.class, args);
    }
}

服务配置文件

优化数据库时区、Seata 事务模式、服务注册参数

server:
  port: 0

spring:
  application:
    name: service-stock
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.203.200:8848
        enabled: true
        username: nacos
        password: 123456
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.203.200:3308/sca_stock?useSSL=false&serverTimezone=Asia/Shanghai&allowMultiQueries=true
    username: root
    password: 123456

seata:
  application-id: service-stock
  registry:
    type: file
  tx-service-group: seata_tx_group
  service:
    vgroup-mapping:
      seata_tx_group: default
    grouplist:
      default: 192.168.203.200:8091
  data-source-proxy-mode: XA

management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

订单服务代码开发

订单服务为全局事务发起端,调用库存服务完成跨服务调用,同时写入本地订单数据库,通过全局事务注解管控整体事务一致性,服务端口 18082,对接 3307 端口订单数据库。

领域模型

订单实体类映射订单数据表字段

package com.lusifer.sca.service.order.domain;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@Data
@TableName("t_order")
public class Order {
    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;
    private Integer userId;
    private Integer productId;
    private Integer count;
}

数据访问层

订单数据持久化操作接口

package com.lusifer.sca.service.order.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lusifer.sca.service.order.domain.Order;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface OrderMapper extends BaseMapper<Order> {

}

远程调用接口

基于 OpenFeign 编写库存服务调用客户端,集成 Sentinel 熔断降级机制

package com.lusifer.sca.service.order.feign;

import com.lusifer.sca.service.order.feign.fallback.StockClientFallback;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient(name = "service-stock", fallback = StockClientFallback.class)
public interface StockClient {
    @GetMapping("/reduce/stock/{productId}/{count}")
    String reduceStock(@PathVariable("productId") Integer productId, @PathVariable("count") Integer count);
}

熔断降级实现

服务调用异常时触发兜底逻辑,保障服务稳定性

package com.lusifer.sca.service.order.feign.fallback;

import com.lusifer.sca.service.order.feign.StockClient;
import org.springframework.stereotype.Component;

@Component
public class StockClientFallback implements StockClient {
    @Override
    public String reduceStock(Integer productId, Integer count) {
        return "reduceStock fallback";
    }
}

业务接口定义

定义创建订单核心业务方法

package com.lusifer.sca.service.order.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.lusifer.sca.service.order.domain.Order;

public interface OrderService extends IService<Order> {
    /**
     * 生成订单并扣减商品库存
     * @param userId 用户 ID
     * @param productId 商品 ID
     * @param count 下单数量
     */
    void createOrder(Integer userId, Integer productId, Integer count);
}

业务接口实现

全局事务注解标记事务发起方法,跨服务调用与本地入库组合执行,预留异常模拟测试点位

package com.lusifer.sca.service.order.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.lusifer.sca.service.order.domain.Order;
import com.lusifer.sca.service.order.feign.StockClient;
import com.lusifer.sca.service.order.mapper.OrderMapper;
import com.lusifer.sca.service.order.service.OrderService;
import lombok.RequiredArgsConstructor;
import org.apache.seata.spring.annotation.GlobalTransactional;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
  private final StockClient stockClient;

  @Override
  @GlobalTransactional(rollbackFor = Exception.class)
  public void createOrder(Integer userId, Integer productId, Integer count) {
    // 远程调用库存服务扣减库存
    stockClient.reduceStock(productId, count);
    // 本地数据库新增订单数据
    Order order = new Order();
    order.setUserId(userId);
    order.setProductId(productId);
    order.setCount(count);
    save(order);

    // 解开注释可模拟业务异常,验证事务回滚效果
    throw new RuntimeException("模拟订单业务执行异常");
  }
}

接口控制器

提供订单创建访问入口

package com.lusifer.sca.service.order.controller;

import com.lusifer.sca.service.order.service.OrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class OrderController {
    private final OrderService orderService;

    @GetMapping("/order/create")
    public void create() {
        orderService.createOrder(1, 1, 1);
    }
}

服务启动类

开启远程调用、服务注册、Mapper 扫描相关注解

package com.lusifer.sca.service.order;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

@EnableFeignClients
@EnableDiscoveryClient
@SpringBootApplication
@MapperScan("com.lusifer.sca.service.order.mapper")
public class OrderApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }
}

服务配置文件

配置端口、数据库、注册中心、Feign 熔断与 Seata 事务参数

server:
  port: 18082

spring:
  application:
    name: service-order
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.203.200:8848
        enabled: true
        username: nacos
        password: 123456
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.203.200:3307/sca_order?useSSL=false&serverTimezone=Asia/Shanghai&allowMultiQueries=true
    username: root
    password: 123456

seata:
  application-id: service-order
  registry:
    type: file
  tx-service-group: seata_tx_group
  service:
    vgroup-mapping:
      seata_tx_group: default
    grouplist:
      default: 192.168.203.200:8091
  data-source-proxy-mode: XA

feign:
  sentinel:
    enabled: true

management:
  endpoints:
    web:
      exposure:
        include: '*'

事务功能测试验证

正常业务流程测试

  1. 依次启动 Nacos、Seata Server、库存服务、订单服务

  2. 浏览器访问地址 http://localhost:18082/order/create

  3. 分别查看 sca_order、sca_stock 两个独立数据库

  4. 预期结果:订单表新增一条数据,商品 1 库存数量同步减少 1

异常回滚流程测试

  1. 取消订单业务类中模拟异常代码注释

  2. 重新启动服务并再次调用订单创建接口

  3. 观测数据库数据状态

  4. 预期结果:业务抛出异常后,库存扣减、订单新增操作全部回滚,两张数据表数据无变动,Seata 控制台可查询到全局事务回滚日志