主页 > 其他  > 

经验分享:用一张表解决并发冲突!数据库事务锁的核心实现逻辑

经验分享:用一张表解决并发冲突!数据库事务锁的核心实现逻辑
背景

对于一些内部使用的管理系统来说,可能没有引入Redis,又想基于现有的基础设施处理并发问题,而数据库是每个应用都避不开的基础设施之一,因此分享个我曾经维护过的一个系统中,使用数据库表来实现事务锁的方式。

之前在文章Java业务功能并发问题处理中实现了使用MySQL行锁、Redis分布式锁来处理业务并发问题,这次来填坑了,如果想了解其他并发问题处理方式和区别,可以看看文章Java业务功能并发问题处理哈。

业务流程说明

方案分析 适用场景 应用服务有多个实例,但是数据库是单实例;没有用上Redis的应用服务,想通过现有的基础设施解决并发数据问题 待改进措施 设置超时机制:当出现锁无法及时释放时需要手动删除表数据,可以设置逻辑删除字段或者定时器删除过期数据;重试获取锁机制:设置一定的循环次数,当获取不到锁时休眠200毫秒再次获取,直到循环次数用尽后再返回失败;锁重入支持:通过增加加锁次数字段让当同一个线程可以重复获取锁 程序实现过程 框架及工具说明 技术框架:SpringBoot、MyBatis、Maven数据库:MySQL测试工具:Apifox表设计及代码说明: 唯一索引:需要有一个用于判断唯一的字段,在数据库表中通过指定唯一索引来实现;加锁的线程号:避免A线程加的锁,被B线程删除;锁的可见性要单独事务:添加事务锁的逻辑应在我们执行业务逻辑的事务之前,且不能跟业务逻辑的事务在一块,否则在事务提交前其他线程根本看不到这个锁,也就达不到我们锁的目的了;为了我们的锁更方便使用,也可以将加锁逻辑抽到注解中实现,注解的实现流程: 在pom文件中引入spring-boot-starter-aop编写自定义注解ConcurrencyLock实现切面类(Aspect)逻辑 代码展示

为了能让大家更关注加解锁逻辑,本文只保留主要代码,参考链接处会放置码云(gitee)的源码地址(或者点击此处跳转); 另外,本文就不展示注解方式的使用了,以免占用篇幅。

代码结构图

实体类 /** * 并发锁实体类 */ public class ConcurrencyLockBean { /** * 数据库主键 */ private Long id; /** * 操作节点 */ private String businessNode; /** * 订单唯一编号 */ private String businessUniqueNo; /** * 线程ID */ private Long threadId; /** * 创建日期 */ private Date creationDate; } /** * 订单实体类 */ @Setter @Getter @ToString public class OrderInfoBean { /** * 自增长主键 */ private int id; /** * 订单号 */ private String orderNo; /** * 物料数量 */ private Integer itemQty; } ConcurrencyLockServiceImpl.java @Slf4j @Service public class ConcurrencyLockServiceImpl implements ConcurrencyLockService { ConcurrencyLockMapper mapper; /** * service类注入 */ @Autowired ConcurrencyLockServiceImpl(ConcurrencyLockMapper mapper) { this.mapper = mapper; } @Override public Boolean tryLock(String businessNode, String businessUniqueNo) { long threadId = Thread.currentThread().getId(); ConcurrencyLockBean concurrencyLock = mapper.selectConcurrencyLock(businessNode, businessUniqueNo); if (concurrencyLock != null) { log.info("{}数据正在操作中,请稍后", threadId); return false; } ConcurrencyLockBean lock = new ConcurrencyLockBean(); lock.setBusinessNode(businessNode); lock.setBusinessUniqueNo(businessUniqueNo); lock.setThreadId(threadId); try { int insertCount = mapper.insertConcurrencyLock(lock); if (insertCount == 0) { log.info("{}获取锁失败,请稍后重试", threadId); return false; } } catch (Exception e) { log.info("{}获取锁异常,请稍后重试", threadId); return false; } log.info("{}完成锁表插入", threadId); return true; } @Override public void unLock(String businessNode, String businessUniqueNo) { ConcurrencyLockBean lock = new ConcurrencyLockBean(); long threadId = Thread.currentThread().getId(); lock.setThreadId(threadId); lock.setBusinessNode(businessNode); lock.setBusinessUniqueNo(businessUniqueNo); mapper.deleteConcurrencyLock(lock); log.info("{}执行解锁完毕", threadId); } } ConcurrencyLockMapper.java import org.apache.ibatis.annotations.Param; public interface ConcurrencyLockMapper { /** * 根据业务节点和唯一业务号查询锁 */ ConcurrencyLockBean selectConcurrencyLock(@Param("businessNode") String businessNode, @Param("businessUniqueNo") String businessUniqueNo); /** * 插入锁 */ int insertConcurrencyLock(ConcurrencyLockBean lock); /** * 删除锁 */ int deleteConcurrencyLock(ConcurrencyLockBean lock); } ConcurrencyLockMapper.xml <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.runningrookie.mapper.ConcurrencyLockMapper"> <select id="selectConcurrencyLock" resultType="com.runningrookie.domain.ConcurrencyLockBean"> SELECT THREAD_ID, BUSINESS_NODE, BUSINESS_UNIQUE_NO, CREATION_DATE FROM concurrency_lock WHERE BUSINESS_UNIQUE_NO = #{businessUniqueNo} AND BUSINESS_NODE = #{businessNode} </select> <insert id="insertConcurrencyLock" useGeneratedKeys="true" keyProperty="id"> INSERT INTO concurrency_lock ( THREAD_ID, BUSINESS_NODE, BUSINESS_UNIQUE_NO, CREATION_DATE) VALUES (#{threadId}, #{businessNode}, #{businessUniqueNo}, NOW()); </insert> <delete id="deleteConcurrencyLock"> DELETE FROM concurrency_lock WHERE THREAD_ID = #{threadId} and BUSINESS_NODE = #{businessNode} and BUSINESS_UNIQUE_NO = #{businessUniqueNo} </delete> </mapper> ConcurrencyLock.java注解 @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface ConcurrencyLock { String businessNode(); String businessUniqueNoKey(); } ConcurrencyLockAspect.java注解类 @Aspect @Component @Slf4j public class ConcurrencyLockAspect { ConcurrencyLockService concurrencyLockService; @Autowired ConcurrencyLockAspect(ConcurrencyLockService concurrencyLockService) { this.concurrencyLockService = concurrencyLockService; } // 环绕切面 @Around("@annotation(concurrencyLock)") public Object around(ProceedingJoinPoint joinPoint, ConcurrencyLock concurrencyLock) throws Throwable { long threadId = Thread.currentThread().getId(); Object[] args = joinPoint.getArgs(); if (args.length == 0) { return joinPoint.proceed(); } // 通过反射获取值 String invokeMethodName = "get" + concurrencyLock.businessUniqueNoKey().substring(0, 1).toUpperCase() + concurrencyLock.businessUniqueNoKey().substring(1); // 获取Order类的Class对象 Class<?> clazz = args[0].getClass(); // 获取getOrderNo方法的Method对象 Method method = clazz.getMethod(invokeMethodName); // 调用getOrderNo方法并获取返回值 String businessUniqueNo = method.invoke(args[0]).toString(); Boolean isSuccessLock = concurrencyLockService.tryLock(concurrencyLock.businessNode(), businessUniqueNo); if (!isSuccessLock) { log.info("{}加锁失败,请稍后重试", threadId); // 生成与切点方法相同的返回对象 return AjaxResult.error("加锁失败,请稍后重试"); } try { log.info("{}开始执行业务逻辑", threadId); joinPoint.proceed(); } finally { concurrencyLockService.unLock(concurrencyLock.businessNode(), businessUniqueNo); } return joinPoint.proceed(); } } OrderInfoController.java @RestController @RequestMapping("/orderInfo") public class OrderInfoController { OrderInfoService orderInfoService; @Autowired private OrderInfoController(OrderInfoService orderInfoService) { this.orderInfoService = orderInfoService; } @PostMapping public AjaxResult saveOrderInfo(@RequestBody OrderInfoBean bean) { return orderInfoService.saveOrderInfo(bean); } } OrderServiceImpl.java /** * 订单逻辑代码 */ @Slf4j @Service public class OrderInfoServiceImpl implements OrderInfoService { ConcurrencyLockService concurrencyLockService; /** * service类注入 */ @Autowired OrderInfoServiceImpl(ConcurrencyLockService concurrencyLockService) { this.concurrencyLockService = concurrencyLockService; } @Override public AjaxResult saveOrderInfo(OrderInfoBean bean) { long threadId = Thread.currentThread().getId(); final String businessNode = "插入"; Boolean isSuccessLock = concurrencyLockService.tryLock(businessNode, bean.getOrderNo()); if (!isSuccessLock) { return AjaxResult.error("加锁失败,请稍后重试"); } try { log.info("{}开始执行业务逻辑", threadId); // TODO:模拟业务逻辑耗时 Thread.sleep(1500); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { concurrencyLockService.unLock(businessNode, bean.getOrderNo()); } return AjaxResult.success(); } @Override @ConcurrencyLock(businessNode = "插入", businessUniqueNoKey = "orderNo") @Transactional public AjaxResult saveOrderInfoByAnnotation(OrderInfoBean bean) { // TODO:模拟业务逻辑耗时 Thread.sleep(1500); return AjaxResult.success(); } } pom.xml相关依赖

在dependencies中添加下列依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.4</version> </dependency> <!-- Mysql驱动包 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> 事务处理表的表结构 CREATE TABLE `concurrency_lock` ( `ID` int NOT NULL AUTO_INCREMENT COMMENT '主键', `THREAD_ID` int DEFAULT NULL COMMENT '线程号', `BUSINESS_NODE` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '操作节点', `BUSINESS_UNIQUE_NO` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '单据号', `CREATION_DATE` datetime DEFAULT NULL COMMENT '创建时间', PRIMARY KEY (`ID`), UNIQUE KEY `uni_business_no` (`BUSINESS_UNIQUE_NO`,`BUSINESS_NODE`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; 测试输出结果

使用Apifox并发发送5次请求,可以看到实际成功获取到锁并执行的只有一个线程

17:08:00.449 [http-nio-8080-exec-1] c.r.service.impl.ConcurrencyLockServiceImpl - 40完成锁表插入 17:08:00.462 [http-nio-8080-exec-1] c.runningrookie.service.impl.OrderInfoServiceImpl - 40开始执行业务逻辑 17:08:00.573 [http-nio-8080-exec-5] c.r.service.impl.ConcurrencyLockServiceImpl - 44获取锁异常,请稍后重试 17:08:00.573 [http-nio-8080-exec-4] c.r.service.impl.ConcurrencyLockServiceImpl - 43获取锁异常,请稍后重试 17:08:00.573 [http-nio-8080-exec-3] c.r.service.impl.ConcurrencyLockServiceImpl - 42获取锁异常,请稍后重试 17:08:00.573 [http-nio-8080-exec-2] c.r.service.impl.ConcurrencyLockServiceImpl - 41获取锁异常,请稍后重试 17:08:00.574 [http-nio-8080-exec-1] c.r.service.impl.ConcurrencyLockServiceImpl - 40执行解锁完毕 参考链接

gitee代码仓库地址:数据库并发锁

标签:

经验分享:用一张表解决并发冲突!数据库事务锁的核心实现逻辑由讯客互联其他栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“经验分享:用一张表解决并发冲突!数据库事务锁的核心实现逻辑