2025-alibaba-分布式事务组件-Seata
- 互联网
- 2025-09-06 17:24:02

分布式事务 实现思路:两阶段提交协议(2PC) 2PC存在的问题 同步阻塞问题
2PC 中的参与者是阻塞的。在第一阶段收到请求后就会预先锁定资源,一直到 commit 后才会释放。
单点故障由于协调者的重要性,一旦协调者TM发生故障,参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。
数据不一致若协调者第二阶段发送提交请求时崩溃,可能部分参与者收到commit请求提交了事务,而另一部分参与者未收到commit请求而放弃事务,从而造成数据不一致的问题。
.Seata是什么 Seata的生命周期1、TM 请求 TC 开启一个全局事务。TC 会生成一个 XID 作为该全局事务的编号。XID会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起。 2、RM 请求 TC 将本地事务注册为全局事务的分支事务,通过全局事务的 XID 进行关联。 5、TM 请求 TC 告诉 XID 对应的全局事务是进行提交还是回滚。 4、TC 驱动 RM 们将 XID 对应的自己的本地事务进行提交还是回滚。
原理 Seata AT模式的设计思路Seata AT模式的核心是对业务无侵入,是一种改进后的两阶段提交,其设计思路如下:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。二阶段: 提交异步化,非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。一阶段
业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。核心在于对业务sql进行解析,转换成undolog,并同时入库,这是怎么做的呢?
二阶段
分布式事务操作成功,则TC通知RM异步删除undolog 分布式事务操作失败,TM向TC发送回滚请求,RM 收到协调器TC发来的回滚请求,通过 XID 和 Branch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚。 seata 快速安装启动下载地址
快速启动 | Apache Seata
sql 脚本 创建数据库 并运行脚本文件 创建表结构如下 修改seata的配置文件 指定nacos的注册和配置中心 server: port: 7091 spring: application: name: seata-server logging: config: classpath:logback-spring.xml file: path: ${user.home}/logs/seata extend: logstash-appender: destination: 127.0.0.1:4560 kafka-appender: bootstrap-servers: 127.0.0.1:9092 topic: logback_to_logstash console: user: username: seata password: seata # 配置注册中心 和 服务发现中心 ---- 注意、注意、注意 group: SEATA_GROUP seata: config: # support: nacos, consul, apollo, zk, etcd3 type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: xulk group: SEATA_GROUP data-id: seataServer.properties username: nacos password: nacos registry: # support: nacos, eureka, redis, zk, consul, etcd3, sofa type: nacos 指定nacos为配置中心 type: nacos nacos: application: seata-server server-addr: 127.0.0.1:8848 group: SEATA_GROUP namespace: xulk cluster: default username: nacos password: nacos store: # support: file 、 db 、 redis mode: file # server: # service-port: 8091 #If not configured, the default is '${server.port} + 1000' security: secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 tokenValidityInMilliseconds: 1800000 ignore: urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login nacos 配置文件来源需要修改的地方
注意 如果数据库是 8.0 驱动需要修改 com.mysql.cj.jdbc.Driver
store.mode=db store.lock.mode=db store.session.mode=db store.db.datasource=druid store.db.dbType=mysql store.db.driverClassName=com.mysql.cj.jdbc.Driver store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true store.db.user=root store.db.password=root store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.distributedLockTable=distributed_lock store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 开始启动 启动成功 登录 http://127.0.0.1:7091 用户名 / 密码 : seata / seata nacos 注册成功 配置事务分组, 要与client配置的事务分组一致 事务分组:seata的资源逻辑,可以按微服务的需要,在应用程序(客户端)对自行定义事务分组,每组取一个名字。集群:seata-server服务端一个或多个节点组成的集群cluster。 应用程序(客户端)使用时需要指定事务逻辑分组与Seata服务端集群的映射关系。 springboot 项目开始集成seata使用AT模式
微服务导入seata依赖 <!-- seata--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> 微服务对应数据库中添加undo_log表(仅AT模式) 每个数据库都要添加 日志表CREATE TABLE `undo_log` ( `branch_id` bigint NOT NULL COMMENT 'branch transaction id', `xid` varchar(128) NOT NULL COMMENT 'global transaction id', `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`), KEY `ix_log_created` (`log_created`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table';
微服务application.yml中添加seata配置 server: port: 8020 spring: application: name: order-service cloud: nacos: discovery: server-addr: 127.0.0.1:8848 username: nacos password: nacos namespace: xulk group: SEATA_GROUP datasource: type: com.alibaba.druid.pool.DruidDataSource druid: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_at_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false username: root password: root initial-size: 10 max-active: 100 min-idle: 10 max-wait: 60000 pool-prepared-statements: true max-pool-prepared-statement-per-connection-size: 20 time-between-eviction-runs-millis: 60000 min-evictable-idle-time-millis: 300000 test-while-idle: true test-on-borrow: false test-on-return: false stat-view-servlet: enabled: true url-pattern: /druid/* filter: stat: log-slow-sql: true slow-sql-millis: 1000 merge-sql: false wall: config: multi-statement-allow: true logging: level: com.tuling: debug seata: application-id: ${spring.application.name} # seata 服务分组,要与服务端配置service.vgroup_mapping的后缀对应 tx-service-group: default_tx_group registry: # 指定nacos作为注册中心 type: nacos nacos: application: seata-server server-addr: 127.0.0.1:8848 namespace: xulk group: SEATA_GROUP username: nacos password: nacos config: # 指定nacos作为配置中心 type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: xulk group: SEATA_GROUP data-id: seataServer.properties username: nacos password: nacos #暴露actuator端点 management: endpoints: web: exposure: include: '*' feign: sentinel: enabled: true controller层 @RestController @RequestMapping("/order") @Slf4j public class OrderController { @Autowired private OrderService orderService; // /order/createOrder @PostMapping("/createOrder") public ResultVo createOrder(@RequestBody OrderVo orderVo) throws Exception { log.info("收到下单请求,用户:{}, 商品编号:{}", orderVo.getUserId(), orderVo.getCommodityCode()); Order order = orderService.saveOrder(orderVo); return ResultVo.ok().put("order",order); } } 测试的 service要保证哪个接口的事务就在哪个接口上添加 全局事务的注解
@GlobalTransactional(name="createOrder",rollbackFor=Exception.class)
import com.tuling.datasource.entity.Order; import com.tuling.datasource.entity.OrderStatus; import com.tuling.datasource.mapper.OrderMapper; import com.tuling.order.feign.AccountFeignService; import com.tuling.order.feign.StorageFeignService; import com.tuling.order.service.OrderService; import com.tuling.order.vo.OrderVo; import io.seata.core.context.RootContext; import io.seata.spring.annotation.GlobalTransactional; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @Slf4j public class OrderServiceImpl implements OrderService { @Autowired private OrderMapper orderMapper; @Autowired private AccountFeignService accountFeignService; @Autowired private StorageFeignService storageFeignService; @Override // @Transactional @GlobalTransactional(name="createOrder",rollbackFor=Exception.class) public Order saveOrder(OrderVo orderVo) { log.info("=============用户下单================="); log.info("当前 XID: {}", RootContext.getXID()); // 保存订单 Order order = new Order(); order.setUserId(orderVo.getUserId()); order.setCommodityCode(orderVo.getCommodityCode()); order.setCount(orderVo.getCount()); order.setMoney(orderVo.getMoney()); order.setStatus(OrderStatus.INIT.getValue()); Integer saveOrderRecord = orderMapper.insert(order); log.info("保存订单{}", saveOrderRecord > 0 ? "成功" : "失败"); //扣减库存 storageFeignService.deduct(orderVo.getCommodityCode(), orderVo.getCount()); //扣减余额 Boolean debit= accountFeignService.debit(orderVo.getUserId(), orderVo.getMoney()); // if(!debit){ // // 解决 feign整合sentinel降级导致Seata失效的处理 // throw new RuntimeException("账户服务异常降级了"); // } //更新订单 Integer updateOrderRecord = orderMapper.updateOrderStatus(order.getId(),OrderStatus.SUCCESS.getValue()); log.info("更新订单id:{} {}", order.getId(), updateOrderRecord > 0 ? "成功" : "失败"); return order; } } feign接口 //feign 接口1 import org.springframework.cloud.openfeign.FeignClient; import org.springframework.stereotype.Repository; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @FeignClient(name="storage-service",path="/storage") @Repository public interface StorageFeignService { @RequestMapping(path = "/deduct") Boolean deduct(@RequestParam("commodityCode") String commodityCode,@RequestParam("count") Integer count); } //feign 接口2 import org.springframework.cloud.openfeign.FeignClient; import org.springframework.stereotype.Repository; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @FeignClient(name = "account-service",path = "/account") @Repository public interface AccountFeignService { @RequestMapping("/debit") Boolean debit(@RequestParam("userId") String userId,@RequestParam("money") int money); } 通过feign远程调用的 Controller 接口 // Controller 接口 2 import com.tuling.account.service.AccountService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/account") public class AccountController { @Autowired private AccountService accountService; @RequestMapping("/debit") public Boolean debit(String userId, int money) throws Exception { // 用户账户扣款 accountService.debit(userId, money); return true; } } // Controller 接口 2 import com.tuling.storage.service.StorageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/storage") public class StorageController { @Autowired private StorageService storageService; @RequestMapping(path = "/deduct") public Boolean deduct(String commodityCode, Integer count) { // 扣减库存 storageService.deduct(commodityCode, count); return true; } } 通过feign远程调用的 Controller 接口 调用的 service层 扣减库存 接口需要添加本地事务 @Transactional import com.tuling.datasource.entity.Storage; import com.tuling.datasource.mapper.StorageMapper; import com.tuling.storage.service.StorageService; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service @Slf4j public class StorageServiceImpl implements StorageService { @Autowired private StorageMapper storageMapper; @Transactional @Override public void deduct(String commodityCode, int count){ log.info("=============扣减库存================="); log.info("当前 XID: {}", RootContext.getXID()); // 检查库存 checkStock(commodityCode,count); log.info("开始扣减 {} 库存", commodityCode); Integer record = storageMapper.reduceStorage(commodityCode,count); log.info("扣减 {} 库存结果:{}", commodityCode, record > 0 ? "操作成功" : "扣减库存失败"); } private void checkStock(String commodityCode, int count){ log.info("检查 {} 库存", commodityCode); Storage storage = storageMapper.findByCommodityCode(commodityCode); if (storage.getCount() < count) { log.warn("{} 库存不足,当前库存:{}", commodityCode, count); throw new RuntimeException("库存不足"); } } } 扣减用户金额 接口需要添加本地事务 @Transactional import com.tuling.account.service.AccountService; import com.tuling.datasource.entity.Account; import com.tuling.datasource.mapper.AccountMapper; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service @Slf4j public class AccountServiceImpl implements AccountService { private static final String ERROR_USER_ID = "1002"; @Autowired private AccountMapper accountMapper; /** * 扣减用户金额 * @param userId * @param money */ @Transactional @Override public void debit(String userId, int money){ log.info("=============用户账户扣款================="); log.info("当前 XID: {}", RootContext.getXID()); checkBalance(userId, money); log.info("开始扣减用户 {} 余额", userId); Integer record = accountMapper.reduceBalance(userId,money); // if (ERROR_USER_ID.equals(userId)) { // // 模拟异常 // throw new RuntimeException("account branch exception"); // } log.info("扣减用户 {} 余额结果:{}", userId, record > 0 ? "操作成功" : "扣减余额失败"); } private void checkBalance(String userId, int money){ log.info("检查用户 {} 余额", userId); Account account = accountMapper.selectByUserId(userId); if (account.getMoney() < money) { log.warn("用户 {} 余额不足,当前余额:{}", userId, account.getMoney()); throw new RuntimeException("余额不足"); } } } 开始测试 请求地址 请求地址 localhost:8020/order/createOrder 请求参数{
"userId": "1001",
"commodityCode": "2001",
"count": 2,
"money": 70
}
进入订单的断点 订单表插入数据 订单库的 undo_log 表 插入回滚的数据 此时库存还没扣除 放过断点 进入 扣除库存的接口,还没扣除余额(未触发异常信息) 此时seata 锁住了 订单和库存 事务信息 此时已经扣除库存 此时放过断点 由于余额不足,触发异常信息 库存回滚 订单回滚 订单 undo_log 表异步删除 注意、注意、注意 这里有个坑如果调用远程服务的时候使用熔断降级的功能,由于降级是通过 try catch 的方式实现的,进入降级的方法后,就相当于进入catch模块了,此时全局事务就不能捕捉到异常信息 就无法回滚了,可以通过手动判断的方式,如果出现异常后 手动判断是否降级异常等信息,然后手动在抛出异常
//扣减余额 Boolean debit= accountFeignService.debit(orderVo.getUserId(), orderVo.getMoney()); if(!debit){ // 解决 feign整合sentinel降级导致Seata失效的处理 throw new RuntimeException("账户服务异常降级了"); } 数据库脚本 SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for account_tbl -- ---------------------------- DROP TABLE IF EXISTS `account_tbl`; CREATE TABLE `account_tbl` ( `id` int(0) NOT NULL AUTO_INCREMENT, `user_id` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL, `money` int(0) NULL DEFAULT 0, PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `user_id`(`user_id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of account_tbl -- ---------------------------- INSERT INTO `account_tbl` VALUES (1, '1001', 60); INSERT INTO `account_tbl` VALUES (2, '1002', 50); -- ---------------------------- -- Table structure for undo_log -- ---------------------------- DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `branch_id` bigint(0) NOT NULL COMMENT 'branch transaction id', `xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'global transaction id', `context` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int(0) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE, INDEX `ix_log_created`(`log_created`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1; SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for order_tbl -- ---------------------------- DROP TABLE IF EXISTS `order_tbl`; CREATE TABLE `order_tbl` ( `id` int(0) NOT NULL AUTO_INCREMENT, `user_id` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL, `commodity_code` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL, `count` int(0) NULL DEFAULT 0, `money` int(0) NULL DEFAULT 0, `status` int(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of order_tbl -- ---------------------------- INSERT INTO `order_tbl` VALUES (4, '1001', '2001', 2, 10, 1); INSERT INTO `order_tbl` VALUES (7, '1001', '2001', 2, 10, 1); -- ---------------------------- -- Table structure for undo_log -- ---------------------------- DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `branch_id` bigint(0) NOT NULL COMMENT 'branch transaction id', `xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'global transaction id', `context` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int(0) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE, INDEX `ix_log_created`(`log_created`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1; SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for storage_tbl -- ---------------------------- DROP TABLE IF EXISTS `storage_tbl`; CREATE TABLE `storage_tbl` ( `id` int(0) NOT NULL AUTO_INCREMENT, `commodity_code` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL, `count` int(0) NULL DEFAULT 0, PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `commodity_code`(`commodity_code`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of storage_tbl -- ---------------------------- INSERT INTO `storage_tbl` VALUES (1, '2001', 994); -- ---------------------------- -- Table structure for undo_log -- ---------------------------- DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `branch_id` bigint(0) NOT NULL COMMENT 'branch transaction id', `xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'global transaction id', `context` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int(0) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE, INDEX `ix_log_created`(`log_created`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
2025-alibaba-分布式事务组件-Seata由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“2025-alibaba-分布式事务组件-Seata”