主页 > 手机  > 

SpringCloud整合seata,XA、AT、TCC、SAGA模式

SpringCloud整合seata,XA、AT、TCC、SAGA模式
参考资料:

SpringCloud-Alibaba搭建

SpringCloud-nacos整合

Seata部署

参考demo(及学习资料)

seata官网

参考视频​​​​​c(AT模式的UNDO_LOG讲的可能有点问题,但是很通俗易懂)

 参考视频2(不太通俗易懂)

沽泡付费视频(就是对着官网念) 

上述三个视频的参考资料


准备环境:

        该教程默认已经有如下环境,如果没有可以参考上述教程:

部署了Nacos部署了Seata搭建了SpringCloud 并且版本已经做到了统一(只要保证JVM里面使用的jar包版本和部署的版本一致即可)同一系列版本见Wiki

 


模拟事件-搭建demo:

        因为seata是分布式事务,所以这个例子需要多个微服务,多个库进行联动

创建数据库

        首先创建两个数据库

CREATE DATABASE /*!32312 IF NOT EXISTS*/`seata_db1` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */; USE `seata_db1`; /*Table structure for table `money` */ DROP TABLE IF EXISTS `money`; CREATE TABLE `money` ( `id` varchar(32) NOT NULL COMMENT '主键', `user_name` varchar(32) DEFAULT NULL COMMENT '名字', `bank_money` double DEFAULT NULL COMMENT '银行余额', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; /*Data for the table `money` */ insert into `money`(`id`,`user_name`,`bank_money`) values ('084d862fa66c4c82886a4b0bb9214ab1','张三',100);

 

CREATE DATABASE /*!32312 IF NOT EXISTS*/`seata_db2` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */; USE `seata_db2`; /*Table structure for table `order_details` */ DROP TABLE IF EXISTS `order_details`; CREATE TABLE `order_details` ( `id` varchar(32) NOT NULL COMMENT '主键', `goods_name` varchar(32) DEFAULT NULL COMMENT '商品名', `count` int(11) DEFAULT NULL COMMENT '仓库余额', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; /*Data for the table `order_details` */ insert into `order_details`(`id`,`goods_name`,`count`) values ('7072809e86834335843bc918c33074ec','desk',50);

 

 创建示例微服务

         用搭建好的SpringClud框架,默认已经整合了nacos,并且已经部署好了seata

        关于微服务的创建,这里就不再赘述了,可以参考该文章

          参考上述文章,创建两个微服务   seata-demo1和seata-demo2,然后进行相关依赖的导入

        这里是将所有的依赖加入到了公共服务里面,请根据个人情况使用,在common-service服务中的pom.xml文件,

Lombok依赖

<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> </dependency>

 控制层依赖

<!-- springboot starter web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>

 mybatis-plus及数据库相关

<dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.1.tmp</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <!--引入mysql--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.11</version> </dependency>

fegin组件

<!--fegin组件--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency>

 nacos相关(使用时默认的,如果nacos不是上述系列的则需要指定版本)

<!--引入nacos client的依赖--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!--引入nacos config 依赖--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency>

还有一些bootstrap.yml启动的辅助依赖

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-loadbalancer</artifactId> </dependency>

最后添加的依赖如下:

然后将公共依赖引入创建的两个微服务,参考

<dependency> <groupId>org.example</groupId> <artifactId>common-service</artifactId> <version>1.0-SNAPSHOT</version> </dependency>

将相关配置交给nacos管理,可以参考文章,这里只是贴代码,不赘述

seata-demo1 bootstrap.yml spring: profiles: include: uat

 bootstrap-uat.yml

spring: config: use-legacy-processing: true profiles.active: uat application: name: seata-demo1 cloud: nacos: config: server-addr: localhost:8848 group: DEFAULT_GROUP username: nacos password: nacos file-extension: yaml refresh-enabled: true

然后将相关配置放入到nacos中

server: port: 8081 spring: nacos: discovery: server-addr: localhost:8848 username: nacos password: nacos datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_db1?characterEncoding=UTF-8 username: root password: 123888

 seata-demo2 bootstrap.yml spring: profiles: include: uat

bootstrap-uat.yml spring: config: use-legacy-processing: true profiles.active: uat application: name: seata-demo2 cloud: nacos: config: server-addr: localhost:8848 group: DEFAULT_GROUP username: nacos password: nacos file-extension: yaml refresh-enabled: true

 然后将相关配置放入到nacos中

server: port: 8080 spring: nacos: discovery: server-addr: localhost:8848 username: nacos password: nacos datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_db2?characterEncoding=UTF-8 username: root password: 123888

这样的话,就可以连接到数据库了,然后编写 mybatis-plus 的三层,参考文章,这里将不再赘述,仅仅是贴代码

seata-demo1 package org.example.entity; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; @Data @TableName("money") public class Money { @TableId private String id; @TableField("user_name") private String userName; @TableField("bank_money") private Double bankMoney; } package org.example.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.example.entity.Money; public interface MoneyMapper extends BaseMapper<Money> { } package org.example.service; import com.baomidou.mybatisplus.extension.service.IService; import org.example.entity.Money; public interface MoneyService extends IService<Money> { } package org.example.service; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.springframework.stereotype.Service; import org.example.dao.MoneyMapper; import org.example.entity.Money; import org.springframework.transaction.annotation.Transactional; @Service public class MoneyServiceImpl extends ServiceImpl<MoneyMapper, Money> implements MoneyService{ }

seata-demo2 @Data @TableName("order_details") public class OrderDetails { @TableId private String id; @TableField("goods_name") private String goodsName; @TableField("count") private int count; } package org.example.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.example.entity.OrderDetails; public interface OrderDetailsMapper extends BaseMapper<OrderDetails> { } package org.example.service; import com.baomidou.mybatisplus.extension.service.IService; import org.example.entity.OrderDetails; public interface OrderDetailsService extends IService<OrderDetails> { }

 然后编写业务代码,分布式事务的调用,模拟扣款和扣库存

seata-demo1

feign调用

package org.example.client; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @FeignClient(value = "seata-demo2-service") public interface Demo2Client { @RequestMapping("order/addCount") ResponseEntity<String> addCount(@RequestParam("count") int count); } MoneyService package org.example.service; import com.baomidou.mybatisplus.extension.service.IService; import org.example.entity.Money; public interface MoneyService extends IService<Money> { void changeMoney(Money money,Double num); } MoneyServiceImpl package org.example.service; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.springframework.stereotype.Service; import org.example.dao.MoneyMapper; import org.example.entity.Money; @Service public class MoneyServiceImpl extends ServiceImpl<MoneyMapper, Money> implements MoneyService{ public void changeMoney(Money money, Double num) { Double bankMoney = money.getBankMoney(); if (bankMoney >= num) { money.setBankMoney(bankMoney - num); this.updateById(money); }else{ throw new RuntimeException("余额不足"); } } } TransService package org.example.service; public interface TransService { void globalRollBackDemo(int orderCount); } TransServiceImpl package org.example.service; import lombok.extern.slf4j.Slf4j; import org.example.client.Demo2Client; import org.example.entity.Money; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; @Service @Slf4j public class TransServiceImpl implements TransService { @Autowired private MoneyService moneyService; @Autowired private Demo2Client demo2Client; /** * 分布式事务 回滚例子 */ public void globalRollBackDemo(int orderCount) { log.info("=================开始扣钱========================"); List<Money> list = moneyService.list(); Money money = list.get(0); Double bankMoney = money.getBankMoney(); log.info("=================查询到银行余额为:"+bankMoney+",开始扣款:20================="); moneyService.changeMoney(money,20D); log.info("=================扣款成功,开始扣库存,扣减库存为:"+orderCount+"========================"); demo2Client.addCount(orderCount); } } MoneyController package org.example.controller; import org.example.client.Demo2Client; import org.example.service.TransService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; @Controller @RequestMapping("money") public class MoneyController { @Autowired private TransService transService; @RequestMapping("test/globalTransactional/fail") public ResponseEntity<String> testGTRollBack() { transService.globalRollBackDemo(60); return new ResponseEntity<String>("sucess",HttpStatus.OK); } @RequestMapping("test/globalTransactional/sucess") public ResponseEntity<String> testGTSucess() { transService.globalRollBackDemo(40); return new ResponseEntity<String>("sucess",HttpStatus.OK); } }

SeataDemo1Application package org.example; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients; @SpringBootApplication @EnableDiscoveryClient @MapperScan("org.example.dao") @EnableFeignClients public class SeataDemo1Application { public static void main( String[] args ) { SpringApplication.run(SeataDemo1Application.class,args); } }

seata-demo2 OrderDetailsController package org.example.controller; import org.example.entity.OrderDetails; import org.example.service.OrderDetailsService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @Controller @RequestMapping("order") public class OrderDetailsController { @Autowired private OrderDetailsService orderService; @RequestMapping("addCount") public ResponseEntity<String> addCount(@RequestParam("count") Integer count) { orderService.updateOrderDetails(count); return new ResponseEntity<String>("success",HttpStatus.OK); } } OrderDetailsService package org.example.service; import com.baomidou.mybatisplus.extension.service.IService; import org.example.entity.OrderDetails; public interface OrderDetailsService extends IService<OrderDetails> { // 修改库存剩余 void updateOrderDetails(Integer count); } OrderDetailsServiceImpl package org.example.service; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; import org.example.dao.OrderDetailsMapper; import org.example.entity.OrderDetails; import org.springframework.stereotype.Service; import java.util.List; @Service @Slf4j public class OrderDetailsServiceImpl extends ServiceImpl<OrderDetailsMapper, OrderDetails> implements OrderDetailsService { @Override public void updateOrderDetails(Integer count) { List<OrderDetails> list = this.list(); // 取第一个进行 OrderDetails orderDetails = list.get(0); if (orderDetails.getCount()>=count) { int sum = orderDetails.getCount() - count; orderDetails.setCount(sum); this.updateById(orderDetails); log.info("库存扣减成功"); }else{ throw new RuntimeException("库存为:"+list.get(0).getCount()+"不足,开始回滚"); } } } SeataDemo2Application package org.example; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients; @SpringBootApplication @EnableDiscoveryClient @MapperScan("org.example.dao") @EnableFeignClients public class SeataDemo2Application { public static void main( String[] args ) { SpringApplication.run(SeataDemo2Application.class,args); } }


XA模式: 导入依赖  可以在公共的服务,或者私有的服务中导入seata的jar包jar包的版本如果和Spring-Cloud是同一个系列的(见版本选择),可以不指定,如果不是一个系列的,则需要指定版本(和部署的版本一致)本教程是将jar包放入到公共服务中,并且指定版本号 <!-- 注意一定要引入对版本,要引入spring-cloud版本seata,而不是springboot版本的seata--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <!-- 排除掉springcloud默认的seata版本,以免版本不一致出现问题--> <exclusions> <exclusion> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> </exclusion> <exclusion> <groupId>io.seata</groupId> <artifactId>seata-all</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.7.0</version> </dependency>

添加seata的相关配置

 在nacos管理的配置中,找到seata-demo1和seata-demo2的配置,添加如下配置:

#seata客户端配置 seata: enabled: true application-id: seata_tx tx-service-group: seata_tx_group service: vgroup-mapping: seata_tx_group: default registry: type: nacos nacos: application: seata-server server-addr: 127.0.0.1:8848 namespace: group: SEATA_GROUP data-source-proxy-mode: XA

开启分布式事务---XA模式 在需要开启分布式事务的地方使用   @GlobalTransactional 修饰即可本地事务,建议加上  @Transactional例如我们要对,上面的扣钱和扣库存业务,添加分布式事务,只需要在seata-demo1的 TransServiceImpl添加@GlobalTransactional

然后在本地事务使用@Transactional修饰如:seata-demo1中的 MoneyServiceImpl 以及seata-demo2中的 OrderDetailsServiceImpl

这样,分布式事务XA模式就实现了 


AT模式:

        AT模式和XA模式在实现上基本没有区别,只需要将YML文件中的XA改为AT即可


TCC模式:

        关于TCC模式的原理,业务悬挂,空回滚等原理,以及TCC专用的注解这些就不说了,详情可以看上面的视频,这里仅仅是讲怎么实现。

        基于上面搭出的框架,按照下面的步骤来编写TCC模式

        TCC模式是在AT模式上进行改造的,所以YML的写法和AT模式一样

修改YML配置 seata-demo1 server: port: 8081 spring: nacos: discovery: server-addr: localhost:8848 username: nacos password: nacos datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_db1?characterEncoding=UTF-8 username: root password: 123888 #seata客户端配置 seata: enabled: true application-id: seata_tx tx-service-group: seata_tx_group service: vgroup-mapping: seata_tx_group: default registry: type: nacos nacos: application: seata-server server-addr: 127.0.0.1:8848 namespace: group: SEATA_GROUP data-source-proxy-mode: AT

 seata-demo2 server: port: 8080 spring: nacos: discovery: server-addr: localhost:8848 username: nacos password: nacos datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_db2?characterEncoding=UTF-8 username: root password: 123888 #seata客户端配置 seata: enabled: true application-id: seata_tx tx-service-group: seata_tx_group service: vgroup-mapping: seata_tx_group: default registry: type: nacos nacos: application: seata-server server-addr: 127.0.0.1:8848 namespace: group: SEATA_GROUP data-source-proxy-mode: AT

创建数据库

         可以在上述seata_db1数据库中创建两个数据库:t_account和t_account_tx

SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for t_account -- ---------------------------- DROP TABLE IF EXISTS `t_account`; CREATE TABLE `t_account` ( `id` INT NOT NULL AUTO_INCREMENT, `user_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `money` INT NULL DEFAULT 0, PRIMARY KEY (`id`) USING BTREE ) ENGINE = INNODB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC; -- ---------------------------- -- Records of t_account -- ---------------------------- INSERT INTO `t_account` VALUES (1, 'U100000', 900); -- ---------------------------- -- Table structure for t_account_tx -- ---------------------------- DROP TABLE IF EXISTS `t_account_tx`; CREATE TABLE `t_account_tx` ( `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键', `tx_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '事务id', `freeze_money` INT NULL DEFAULT NULL COMMENT '冻结金额', `state` INT NULL DEFAULT NULL COMMENT '状态 0try 1confirm 2cancel', PRIMARY KEY (`id`) USING BTREE ) ENGINE = INNODB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC;

        可以在上述seata_db2数据库中创建两个数据库:t_order和t_order_tx

SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for t_order -- ---------------------------- DROP TABLE IF EXISTS `t_order`; CREATE TABLE `t_order` ( `id` INT NOT NULL AUTO_INCREMENT, `user_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `commodity_code` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `count` INT NULL DEFAULT 0, `money` INT NULL DEFAULT 0, PRIMARY KEY (`id`) USING BTREE ) ENGINE = INNODB AUTO_INCREMENT = 25 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC; -- ---------------------------- -- Records of t_order -- ---------------------------- -- ---------------------------- -- Table structure for t_order_tx -- ---------------------------- DROP TABLE IF EXISTS `t_order_tx`; CREATE TABLE `t_order_tx` ( `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '主键', `tx_id` VARCHAR(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '事务id', `state` INT NULL DEFAULT NULL COMMENT '状态 0try 1confirm 2cancel', PRIMARY KEY (`id`) USING BTREE ) ENGINE = INNODB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = DYNAMIC; 编写Mybatis-plus的三层及业务层  seata-demo1 实体 package org.example.entity; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; @Data @TableName("t_account") public class Account { @TableId(type = IdType.AUTO) private Integer id; private String userId; private int money; }

package org.example.entity; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; @Data @TableName("t_account_tx") public class AccountTX { public static final int STATE_TRY = 0; public static final int STATE_CONFIRM = 1; public static final int STATE_CANCEL = 2; @TableId(type = IdType.AUTO) private Integer id; private String txId; private int freezeMoney; private int state = STATE_TRY; } dao层 package org.example.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.example.entity.Account; public interface AccountMapper extends BaseMapper<Account> { }

package org.example.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.example.entity.AccountTX; public interface AccountTXMapper extends BaseMapper<AccountTX> { } service层 interface接口 package org.example.service; import com.baomidou.mybatisplus.extension.service.IService; import org.example.entity.Account; public interface IAccountService extends IService<Account> { /** * 账户扣款 * @param userId * @param money * @return */ void reduce(String userId, int money); } package org.example.service; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; /** * TCC 二阶段提交业务接口 */ @LocalTCC public interface IAccountTCCService { /** * try-预扣款 */ @TwoPhaseBusinessAction(name="tryReduce", commitMethod = "confirm", rollbackMethod = "cancel") void tryReduce(@BusinessActionContextParameter(paramName = "userId") String userId, @BusinessActionContextParameter(paramName = "money") int money); /** * confirm-提交 * @param ctx * @return */ boolean confirm(BusinessActionContext ctx); /** * cancel-回滚 * @param ctx * @return */ boolean cancel(BusinessActionContext ctx); } 实现层 package org.example.service; import org.example.dao.AccountMapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.example.entity.Account; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements IAccountService { @Override @Transactional public void reduce(String userId, int money) { Account one = lambdaQuery().eq(Account::getUserId, userId).one(); if(one != null && one.getMoney() < money){ throw new RuntimeException("Not Enough Money ..."); } lambdaUpdate().setSql("money = money - " + money) .eq(Account::getUserId, userId) .update(); } } package org.example.service; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import io.seata.core.context.RootContext; import io.seata.rm.tcc.api.BusinessActionContext; import org.example.dao.AccountMapper; import org.example.dao.AccountTXMapper; import org.example.entity.Account; import org.example.entity.AccountTX; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class AccountTCCServiceImpl implements IAccountTCCService { @Autowired private AccountMapper accountMapper; @Autowired private AccountTXMapper accountTXMapper; @Override public void tryReduce(String userId, int money) { System.err.println("-----------tryReduce-------------" + RootContext.getXID()); //业务悬挂 AccountTX accountTX = accountTXMapper.selectOne(new LambdaQueryWrapper<AccountTX>().eq(AccountTX::getTxId, RootContext.getXID())); if (accountTX != null){ //存在,说明已经canel执行过类,拒绝服务 return; } Account one = accountMapper.selectOne(new LambdaQueryWrapper<Account>().eq(Account::getUserId, userId)); if(one != null && one.getMoney() < money){ throw new RuntimeException("Not Enough Money ..."); } LambdaUpdateWrapper<Account> wrapper = new LambdaUpdateWrapper<>(); wrapper.setSql("money = money - " + money); wrapper.eq(Account::getUserId, userId); accountMapper.update(null, wrapper); AccountTX tx = new AccountTX(); tx.setFreezeMoney(money); tx.setTxId(RootContext.getXID()); tx.setState(AccountTX.STATE_TRY); accountTXMapper.insert(tx); } @Override public boolean confirm(BusinessActionContext ctx) { System.err.println("-----------confirm-------------"); //删除记录 int ret = accountTXMapper.delete(new LambdaQueryWrapper<AccountTX>().eq(AccountTX::getTxId, ctx.getXid())); return ret == 1; } @Override public boolean cancel(BusinessActionContext ctx) { System.err.println("-----------cancel-------------"); String userId = ctx.getActionContext("userId").toString(); String money = ctx.getActionContext("money").toString(); AccountTX accountTX = accountTXMapper.selectOne(new LambdaQueryWrapper<AccountTX>().eq(AccountTX::getTxId, ctx.getXid())); if (accountTX == null){ //为空, 空回滚 accountTX = new AccountTX(); accountTX.setTxId(ctx.getXid()); accountTX.setState(AccountTX.STATE_CANCEL); if(money != null){ accountTX.setFreezeMoney(Integer.parseInt(money)); } accountTXMapper.insert(accountTX); return true; } //幂等处理 if(accountTX.getState() == AccountTX.STATE_CANCEL){ return true; } //恢复余额 accountMapper.update(null, new LambdaUpdateWrapper<Account>() .setSql("money = money + " + money) .eq(Account::getUserId, userId)); accountTX.setFreezeMoney(0); accountTX.setState(AccountTX.STATE_CANCEL); int ret = accountTXMapper.updateById(accountTX); return ret == 1; } } controller层 package org.example.controller; import org.example.service.IAccountTCCService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("accounts") public class AccountController { //@Autowired //private IAccountService accountService; @Autowired private IAccountTCCService accountTCCService; @GetMapping(value = "/reduce") public String reduce(String userId, int money) { try { accountTCCService.tryReduce(userId, money); } catch (Exception exx) { exx.printStackTrace(); return "FAIL"; } return "SUCCESS"; } } 启动类 package org.example; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients; @SpringBootApplication @EnableDiscoveryClient @MapperScan("org.example.dao") @EnableFeignClients public class SeataDemo1Application { public static void main( String[] args ) { SpringApplication.run(SeataDemo1Application.class,args); } }

seata-demo2 实体层 package org.example.entity; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; @Data @TableName("t_order") public class Order { @TableId(type = IdType.AUTO) private Integer id; private String userId; private String commodityCode; private Integer count; private Integer money; }

package org.example.entity; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; @Data @TableName("t_order_tx") public class OrderTX { public static final int STATE_TRY = 0; public static final int STATE_CONFIRM = 1; public static final int STATE_CANCEL = 2; @TableId(type = IdType.AUTO) private Integer id; private String txId; private int state = STATE_TRY; } dao层 package org.example.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.example.entity.Order; public interface OrderMapper extends BaseMapper<Order> { }

package org.example.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.example.entity.OrderTX; public interface OrderTXMapper extends BaseMapper<OrderTX> { } service层 interface层 package org.example.service; import com.baomidou.mybatisplus.extension.service.IService; import org.example.entity.Order; public interface IOrderService extends IService<Order> { /** * 创建订单 */ void create(String userId, String commodityCode, int orderCount); } package org.example.service; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; /** * TCC 二阶段提交业务接口 */ @LocalTCC public interface IOrderTCCService { /** * try-预扣款 */ @TwoPhaseBusinessAction(name="tryCreate", commitMethod = "confirm", rollbackMethod = "cancel") void tryCreate(@BusinessActionContextParameter(paramName = "userId") String userId, @BusinessActionContextParameter(paramName = "commodityCode") String commodityCode, @BusinessActionContextParameter(paramName = "orderCount") int orderCount); /** * confirm-提交 * @param ctx * @return */ boolean confirm(BusinessActionContext ctx); /** * cancel-回滚 * @param ctx * @return */ boolean cancel(BusinessActionContext ctx); } 实现层 package org.example.service; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.example.entity.Order; import org.example.feign.AccountFeignClient; import org.example.dao.OrderMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service @Transactional public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService { @Autowired private AccountFeignClient accountFeignClient; @Override @Transactional public void create(String userId, String commodityCode, int count) { // 定单总价 = 订购数量(count) * 商品单价(100) int orderMoney = count * 100; // 生成订单 Order order = new Order(); order.setCount(count); order.setCommodityCode(commodityCode); order.setUserId(userId); order.setMoney(orderMoney); super.save(order); // 调用账户余额扣减 String result = accountFeignClient.reduce(userId, orderMoney); if (!"SUCCESS".equals(result)) { throw new RuntimeException("Failed to call Account Service. "); } } } package org.example.service; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import io.seata.core.context.RootContext; import io.seata.rm.tcc.api.BusinessActionContext; import org.example.dao.OrderMapper; import org.example.dao.OrderTXMapper; import org.example.entity.Order; import org.example.entity.OrderTX; import org.example.feign.AccountFeignClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class OrderTCCServiceImpl implements IOrderTCCService { @Autowired private AccountFeignClient accountFeignClient; @Autowired private OrderMapper orderMapper; @Autowired private OrderTXMapper orderTXMapper; @Override public void tryCreate(String userId, String commodityCode, int count) { System.err.println("---------tryCreate-----------"); //业务悬挂 OrderTX orderTX = orderTXMapper.selectOne(new LambdaQueryWrapper<OrderTX>().eq(OrderTX::getTxId, RootContext.getXID())); if (orderTX != null){ //存在,说明已经canel执行过类,拒绝服务 return; } // 定单总价 = 订购数量(count) * 商品单价(100) int orderMoney = count * 100; // 生成订单 Order order = new Order(); order.setCount(count); order.setCommodityCode(commodityCode); order.setUserId(userId); order.setMoney(orderMoney); orderMapper.insert(order); OrderTX tx = new OrderTX(); tx.setTxId(RootContext.getXID()); tx.setState(OrderTX.STATE_TRY); orderTXMapper.insert(tx); // 调用账户余额扣减 String result = accountFeignClient.reduce(userId, orderMoney); if (!"SUCCESS".equals(result)) { throw new RuntimeException("Failed to call Account Service. "); } } @Override public boolean confirm(BusinessActionContext ctx) { System.err.println("---------confirm-----------"); //删除记录 int ret = orderTXMapper.delete(new LambdaQueryWrapper<OrderTX>().eq(OrderTX::getTxId, ctx.getXid())); return ret == 1; } @Override public boolean cancel(BusinessActionContext ctx) { System.err.println("---------cancel-----------" ); String userId = ctx.getActionContext("userId").toString(); String commodityCode = ctx.getActionContext("commodityCode").toString(); OrderTX orderTX = orderTXMapper.selectOne(new LambdaQueryWrapper<OrderTX>().eq(OrderTX::getTxId, ctx.getXid())); if (orderTX == null){ //为空, 空回滚 orderTX = new OrderTX(); orderTX.setTxId(ctx.getXid()); orderTX.setState(OrderTX.STATE_CANCEL); orderTXMapper.insert(orderTX); return true; } //幂等处理 if(orderTX.getState() == OrderTX.STATE_CANCEL){ return true; } //恢复余额 orderMapper.delete(new LambdaQueryWrapper<Order>().eq(Order::getUserId, userId).eq(Order::getCommodityCode, commodityCode)); orderTX.setState(OrderTX.STATE_CANCEL); int ret = orderTXMapper.updateById(orderTX); return ret == 1; } } controller层 package org.example.controller; import org.example.service.IOrderTCCService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("orders") public class OrderController { @Autowired private IOrderTCCService orderTCCService; @GetMapping(value = "/create") public String create(String userId, String commodityCode, int orderCount) { try { orderTCCService.tryCreate(userId, commodityCode, orderCount); } catch (Exception exx) { exx.printStackTrace(); return "FAIL"; } return "SUCCESS"; } } feign层 package org.example.feign; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; @FeignClient(name = "seata-demo2-service") public interface AccountFeignClient { @GetMapping("/accounts/reduce") String reduce(@RequestParam("userId") String userId, @RequestParam("money") int money); } 启动类 package org.example; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients; @SpringBootApplication @EnableDiscoveryClient @MapperScan("org.example.dao") @EnableFeignClients public class SeataDemo2Application { public static void main( String[] args ) { SpringApplication.run(SeataDemo2Application.class,args); } }

测试

正常:http://localhost:8088/businesses/purchase?rollback=false&count=2

超库存:http://localhost:8088/businesses/purchase?rollback=false&count=12

超余额:http://localhost:8088/businesses/purchase?rollback=false&count=8


SAGA模式:

        saga模式适用于长事务,如银行API调用,等繁杂的事务处理

标签:

SpringCloud整合seata,XA、AT、TCC、SAGA模式由讯客互联手机栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“SpringCloud整合seata,XA、AT、TCC、SAGA模式