主页 > 互联网  > 

2025-alibaba-分布式事务组件-Seata

2025-alibaba-分布式事务组件-Seata
分布式事务

 实现思路:两阶段提交协议(2PC)

2PC存在的问题 同步阻塞问题

2PC 中的参与者是阻塞的。在第一阶段收到请求后就会预先锁定资源,一直到 commit 后才会释放。

单点故障

由于协调者的重要性,一旦协调者TM发生故障,参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。

数据不一致

若协调者第二阶段发送提交请求时崩溃,可能部分参与者收到commit请求提交了事务,而另一部分参与者未收到commit请求而放弃事务,从而造成数据不一致的问题。

.Seata是什么 Seata的生命周期

1、TM 请求 TC 开启一个全局事务。TC 会生成一个 XID 作为该全局事务的编号。XID会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起。 2、RM 请求 TC 将本地事务注册为全局事务的分支事务,通过全局事务的 XID 进行关联。 5、TM 请求 TC 告诉 XID 对应的全局事务是进行提交还是回滚。 4、TC 驱动 RM 们将 XID 对应的自己的本地事务进行提交还是回滚。

原理  Seata AT模式的设计思路

Seata AT模式的核心是对业务无侵入,是一种改进后的两阶段提交,其设计思路如下:

一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。二阶段: 提交异步化,非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。

一阶段

业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。核心在于对业务sql进行解析,转换成undolog,并同时入库,这是怎么做的呢?

二阶段

分布式事务操作成功,则TC通知RM异步删除undolog

分布式事务操作失败,TM向TC发送回滚请求,RM 收到协调器TC发来的回滚请求,通过 XID 和 Branch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚。

seata 快速安装启动

下载地址

快速启动 | Apache Seata

sql 脚本

创建数据库 并运行脚本文件 创建表结构如下

修改seata的配置文件 指定nacos的注册和配置中心

server: port: 7091 spring: application: name: seata-server logging: config: classpath:logback-spring.xml file: path: ${user.home}/logs/seata extend: logstash-appender: destination: 127.0.0.1:4560 kafka-appender: bootstrap-servers: 127.0.0.1:9092 topic: logback_to_logstash console: user: username: seata password: seata # 配置注册中心 和 服务发现中心 ---- 注意、注意、注意 group: SEATA_GROUP seata: config: # support: nacos, consul, apollo, zk, etcd3 type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: xulk group: SEATA_GROUP data-id: seataServer.properties username: nacos password: nacos registry: # support: nacos, eureka, redis, zk, consul, etcd3, sofa type: nacos 指定nacos为配置中心 type: nacos nacos: application: seata-server server-addr: 127.0.0.1:8848 group: SEATA_GROUP namespace: xulk cluster: default username: nacos password: nacos store: # support: file 、 db 、 redis mode: file # server: # service-port: 8091 #If not configured, the default is '${server.port} + 1000' security: secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 tokenValidityInMilliseconds: 1800000 ignore: urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login

nacos 配置文件来源

 

需要修改的地方

 注意 如果数据库是 8.0 驱动需要修改 com.mysql.cj.jdbc.Driver

store.mode=db store.lock.mode=db store.session.mode=db store.db.datasource=druid store.db.dbType=mysql store.db.driverClassName=com.mysql.cj.jdbc.Driver store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true store.db.user=root store.db.password=root store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.distributedLockTable=distributed_lock store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 开始启动

 启动成功 

登录   http://127.0.0.1:7091 用户名 / 密码 : seata / seata

nacos 注册成功

配置事务分组, 要与client配置的事务分组一致 事务分组:seata的资源逻辑,可以按微服务的需要,在应用程序(客户端)对自行定义事务分组,每组取一个名字。集群:seata-server服务端一个或多个节点组成的集群cluster。 应用程序(客户端)使用时需要指定事务逻辑分组与Seata服务端集群的映射关系。

springboot 项目开始集成seata

使用AT模式

微服务导入seata依赖 <!-- seata--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency> 微服务对应数据库中添加undo_log表(仅AT模式) 每个数据库都要添加 日志表   

CREATE TABLE `undo_log` (   `branch_id` bigint NOT NULL COMMENT 'branch transaction id',   `xid` varchar(128) NOT NULL COMMENT 'global transaction id',   `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',   `rollback_info` longblob NOT NULL COMMENT 'rollback info',   `log_status` int NOT NULL COMMENT '0:normal status,1:defense status',   `log_created` datetime(6) NOT NULL COMMENT 'create datetime',   `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',   UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`),   KEY `ix_log_created` (`log_created`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table';

微服务application.yml中添加seata配置 server: port: 8020 spring: application: name: order-service cloud: nacos: discovery: server-addr: 127.0.0.1:8848 username: nacos password: nacos namespace: xulk group: SEATA_GROUP datasource: type: com.alibaba.druid.pool.DruidDataSource druid: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_at_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false username: root password: root initial-size: 10 max-active: 100 min-idle: 10 max-wait: 60000 pool-prepared-statements: true max-pool-prepared-statement-per-connection-size: 20 time-between-eviction-runs-millis: 60000 min-evictable-idle-time-millis: 300000 test-while-idle: true test-on-borrow: false test-on-return: false stat-view-servlet: enabled: true url-pattern: /druid/* filter: stat: log-slow-sql: true slow-sql-millis: 1000 merge-sql: false wall: config: multi-statement-allow: true logging: level: com.tuling: debug seata: application-id: ${spring.application.name} # seata 服务分组,要与服务端配置service.vgroup_mapping的后缀对应 tx-service-group: default_tx_group registry: # 指定nacos作为注册中心 type: nacos nacos: application: seata-server server-addr: 127.0.0.1:8848 namespace: xulk group: SEATA_GROUP username: nacos password: nacos config: # 指定nacos作为配置中心 type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: xulk group: SEATA_GROUP data-id: seataServer.properties username: nacos password: nacos #暴露actuator端点 management: endpoints: web: exposure: include: '*' feign: sentinel: enabled: true controller层 @RestController @RequestMapping("/order") @Slf4j public class OrderController { @Autowired private OrderService orderService; // /order/createOrder @PostMapping("/createOrder") public ResultVo createOrder(@RequestBody OrderVo orderVo) throws Exception { log.info("收到下单请求,用户:{}, 商品编号:{}", orderVo.getUserId(), orderVo.getCommodityCode()); Order order = orderService.saveOrder(orderVo); return ResultVo.ok().put("order",order); } } 测试的 service

要保证哪个接口的事务就在哪个接口上添加 全局事务的注解 

@GlobalTransactional(name="createOrder",rollbackFor=Exception.class)

import com.tuling.datasource.entity.Order; import com.tuling.datasource.entity.OrderStatus; import com.tuling.datasource.mapper.OrderMapper; import com.tuling.order.feign.AccountFeignService; import com.tuling.order.feign.StorageFeignService; import com.tuling.order.service.OrderService; import com.tuling.order.vo.OrderVo; import io.seata.core.context.RootContext; import io.seata.spring.annotation.GlobalTransactional; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @Slf4j public class OrderServiceImpl implements OrderService { @Autowired private OrderMapper orderMapper; @Autowired private AccountFeignService accountFeignService; @Autowired private StorageFeignService storageFeignService; @Override // @Transactional @GlobalTransactional(name="createOrder",rollbackFor=Exception.class) public Order saveOrder(OrderVo orderVo) { log.info("=============用户下单================="); log.info("当前 XID: {}", RootContext.getXID()); // 保存订单 Order order = new Order(); order.setUserId(orderVo.getUserId()); order.setCommodityCode(orderVo.getCommodityCode()); order.setCount(orderVo.getCount()); order.setMoney(orderVo.getMoney()); order.setStatus(OrderStatus.INIT.getValue()); Integer saveOrderRecord = orderMapper.insert(order); log.info("保存订单{}", saveOrderRecord > 0 ? "成功" : "失败"); //扣减库存 storageFeignService.deduct(orderVo.getCommodityCode(), orderVo.getCount()); //扣减余额 Boolean debit= accountFeignService.debit(orderVo.getUserId(), orderVo.getMoney()); // if(!debit){ // // 解决 feign整合sentinel降级导致Seata失效的处理 // throw new RuntimeException("账户服务异常降级了"); // } //更新订单 Integer updateOrderRecord = orderMapper.updateOrderStatus(order.getId(),OrderStatus.SUCCESS.getValue()); log.info("更新订单id:{} {}", order.getId(), updateOrderRecord > 0 ? "成功" : "失败"); return order; } } feign接口 //feign 接口1 import org.springframework.cloud.openfeign.FeignClient; import org.springframework.stereotype.Repository; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @FeignClient(name="storage-service",path="/storage") @Repository public interface StorageFeignService { @RequestMapping(path = "/deduct") Boolean deduct(@RequestParam("commodityCode") String commodityCode,@RequestParam("count") Integer count); } //feign 接口2 import org.springframework.cloud.openfeign.FeignClient; import org.springframework.stereotype.Repository; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @FeignClient(name = "account-service",path = "/account") @Repository public interface AccountFeignService { @RequestMapping("/debit") Boolean debit(@RequestParam("userId") String userId,@RequestParam("money") int money); } 通过feign远程调用的  Controller 接口  // Controller 接口 2 import com.tuling.account.service.AccountService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/account") public class AccountController { @Autowired private AccountService accountService; @RequestMapping("/debit") public Boolean debit(String userId, int money) throws Exception { // 用户账户扣款 accountService.debit(userId, money); return true; } } // Controller 接口 2 import com.tuling.storage.service.StorageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/storage") public class StorageController { @Autowired private StorageService storageService; @RequestMapping(path = "/deduct") public Boolean deduct(String commodityCode, Integer count) { // 扣减库存 storageService.deduct(commodityCode, count); return true; } } 通过feign远程调用的  Controller 接口 调用的 service层 扣减库存  接口需要添加本地事务  @Transactional import com.tuling.datasource.entity.Storage; import com.tuling.datasource.mapper.StorageMapper; import com.tuling.storage.service.StorageService; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service @Slf4j public class StorageServiceImpl implements StorageService { @Autowired private StorageMapper storageMapper; @Transactional @Override public void deduct(String commodityCode, int count){ log.info("=============扣减库存================="); log.info("当前 XID: {}", RootContext.getXID()); // 检查库存 checkStock(commodityCode,count); log.info("开始扣减 {} 库存", commodityCode); Integer record = storageMapper.reduceStorage(commodityCode,count); log.info("扣减 {} 库存结果:{}", commodityCode, record > 0 ? "操作成功" : "扣减库存失败"); } private void checkStock(String commodityCode, int count){ log.info("检查 {} 库存", commodityCode); Storage storage = storageMapper.findByCommodityCode(commodityCode); if (storage.getCount() < count) { log.warn("{} 库存不足,当前库存:{}", commodityCode, count); throw new RuntimeException("库存不足"); } } } 扣减用户金额  接口需要添加本地事务  @Transactional import com.tuling.account.service.AccountService; import com.tuling.datasource.entity.Account; import com.tuling.datasource.mapper.AccountMapper; import io.seata.core.context.RootContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service @Slf4j public class AccountServiceImpl implements AccountService { private static final String ERROR_USER_ID = "1002"; @Autowired private AccountMapper accountMapper; /** * 扣减用户金额 * @param userId * @param money */ @Transactional @Override public void debit(String userId, int money){ log.info("=============用户账户扣款================="); log.info("当前 XID: {}", RootContext.getXID()); checkBalance(userId, money); log.info("开始扣减用户 {} 余额", userId); Integer record = accountMapper.reduceBalance(userId,money); // if (ERROR_USER_ID.equals(userId)) { // // 模拟异常 // throw new RuntimeException("account branch exception"); // } log.info("扣减用户 {} 余额结果:{}", userId, record > 0 ? "操作成功" : "扣减余额失败"); } private void checkBalance(String userId, int money){ log.info("检查用户 {} 余额", userId); Account account = accountMapper.selectByUserId(userId); if (account.getMoney() < money) { log.warn("用户 {} 余额不足,当前余额:{}", userId, account.getMoney()); throw new RuntimeException("余额不足"); } } } 开始测试 请求地址 请求地址  localhost:8020/order/createOrder 请求参数

{

    "userId": "1001",

    "commodityCode": "2001",

    "count": 2,

    "money": 70

}

进入订单的断点

订单表插入数据

订单库的 undo_log 表 插入回滚的数据

此时库存还没扣除

放过断点     进入 扣除库存的接口,还没扣除余额(未触发异常信息)

此时seata 锁住了 订单和库存

 事务信息

 此时已经扣除库存

此时放过断点 由于余额不足,触发异常信息

库存回滚

订单回滚

订单 undo_log 表异步删除

注意、注意、注意  这里有个坑

如果调用远程服务的时候使用熔断降级的功能,由于降级是通过 try catch 的方式实现的,进入降级的方法后,就相当于进入catch模块了,此时全局事务就不能捕捉到异常信息 就无法回滚了,可以通过手动判断的方式,如果出现异常后 手动判断是否降级异常等信息,然后手动在抛出异常

 

//扣减余额 Boolean debit= accountFeignService.debit(orderVo.getUserId(), orderVo.getMoney()); if(!debit){ // 解决 feign整合sentinel降级导致Seata失效的处理 throw new RuntimeException("账户服务异常降级了"); } 数据库脚本 SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for account_tbl -- ---------------------------- DROP TABLE IF EXISTS `account_tbl`; CREATE TABLE `account_tbl` ( `id` int(0) NOT NULL AUTO_INCREMENT, `user_id` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL, `money` int(0) NULL DEFAULT 0, PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `user_id`(`user_id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of account_tbl -- ---------------------------- INSERT INTO `account_tbl` VALUES (1, '1001', 60); INSERT INTO `account_tbl` VALUES (2, '1002', 50); -- ---------------------------- -- Table structure for undo_log -- ---------------------------- DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `branch_id` bigint(0) NOT NULL COMMENT 'branch transaction id', `xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'global transaction id', `context` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int(0) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE, INDEX `ix_log_created`(`log_created`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1; SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for order_tbl -- ---------------------------- DROP TABLE IF EXISTS `order_tbl`; CREATE TABLE `order_tbl` ( `id` int(0) NOT NULL AUTO_INCREMENT, `user_id` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL, `commodity_code` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL, `count` int(0) NULL DEFAULT 0, `money` int(0) NULL DEFAULT 0, `status` int(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of order_tbl -- ---------------------------- INSERT INTO `order_tbl` VALUES (4, '1001', '2001', 2, 10, 1); INSERT INTO `order_tbl` VALUES (7, '1001', '2001', 2, 10, 1); -- ---------------------------- -- Table structure for undo_log -- ---------------------------- DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `branch_id` bigint(0) NOT NULL COMMENT 'branch transaction id', `xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'global transaction id', `context` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int(0) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE, INDEX `ix_log_created`(`log_created`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1; SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for storage_tbl -- ---------------------------- DROP TABLE IF EXISTS `storage_tbl`; CREATE TABLE `storage_tbl` ( `id` int(0) NOT NULL AUTO_INCREMENT, `commodity_code` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL, `count` int(0) NULL DEFAULT 0, PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `commodity_code`(`commodity_code`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of storage_tbl -- ---------------------------- INSERT INTO `storage_tbl` VALUES (1, '2001', 994); -- ---------------------------- -- Table structure for undo_log -- ---------------------------- DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `branch_id` bigint(0) NOT NULL COMMENT 'branch transaction id', `xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'global transaction id', `context` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int(0) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE, INDEX `ix_log_created`(`log_created`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;

标签:

2025-alibaba-分布式事务组件-Seata由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“2025-alibaba-分布式事务组件-Seata