主页 > 软件开发  > 

从零搭建微服务项目Pro(第1-3章——Quartz定时任务模块整合)

从零搭建微服务项目Pro(第1-3章——Quartz定时任务模块整合)
前言:

在企业项目中,往往有定时任务发布的需求,比如每天晚9点将今日数据备份一次,或每月一号将上月的销售数据邮件发送给对应的工作人员。显然这些操作不可能是人工到时间点调用一次接口,需要编写专门的模块完成任务的调度。

Quartz 是一个功能强大的开源任务调度框架,广泛应用于 Java 应用程序中。它允许开发者根据时间表(如固定时间、间隔时间或 Cron 表达式)来调度任务。Quartz 提供了丰富的 API 和灵活的配置选项,适用于从简单的任务调度到复杂的分布式任务调度场景。

尽管quartz库本身提供很多方法,但在定时任务中,外界往往只能感知到任务的启动与暂停,以及原始quartz库对于每一种定时任务的实现都需要新添加一种实现接口,不方便拓展的同时程序的耦合度较高。经过前面两章的迭代已经成功开发了数据库持久化、便于维护、日志记录的定时任务模块,具体可参考下方链接,本章将介绍如何将模块整合至微服务项目中。

从零搭建微服务项目Pro(第1-2章——Quartz实现定时任务模块优化)-CSDN博客 blog.csdn.net/wlf2030/article/details/145933078?spm=1001.2014.3001.5501

本章最终源码链接如下:

wlf728050719/SpringCloudPro1-3 github /wlf728050719/SpringCloudPro1-3以及本专栏会持续更新微服务项目,每一章的项目都会基于前一章项目进行功能的完善,欢迎小伙伴们关注!同时如果只是对单章感兴趣也不用从头看,只需下载前一章项目即可,每一章都会有前置项目准备部分,跟着操作就能实现上一章的最终效果,当然如果是一直跟着做可以直接跳过这一部分。专栏目录链接如下,其中Base篇为基础微服务搭建,Pro篇为复杂模块实现。

从零搭建微服务项目(全)-CSDN博客 blog.csdn.net/wlf2030/article/details/145799620


一、Quartz系统介绍

核心概念

Job: 定义要执行的任务,实现 Job 接口并重写 execute 方法。

JobDetail: 包含 Job 的详细信息,如名称、组名等。

Trigger: 定义任务的触发条件,如开始时间、结束时间、重复频率等。

Scheduler: 负责调度任务,将 JobDetail 和 Trigger 绑定并管理任务执行。

JobDataMap: 任务数据映射,用于在任务执行时传递数据。

底层实现

任务存储

Quartz 支持内存存储(RAMJobStore)和数据库存储(JDBCJobStore)。RAMJobStore 将任务存储在内存中,适合简单的应用场景;JDBCJobStore 将任务存储在数据库中,适合分布式和持久化的场景。

线程池

Quartz 使用线程池来执行任务。默认情况下,Quartz 使用 SimpleThreadPool,开发者可以配置线程池的大小以适应不同的负载。

触发器调度

Quartz 使用 QuartzSchedulerThread 来检查触发器并根据时间表调度任务。该线程会定期检查触发器,并根据触发器的状态决定是否执行任务。

任务执行

当触发器触发时,Quartz 会从线程池中获取一个线程来执行任务。任务执行过程中,Quartz 会处理任务的并发性、异常处理等。

Scheduler 是 Quartz 框架的核心接口,负责管理和调度任务(Job)和触发器(Trigger)。以下是 Scheduler 接口中常用的方法及其说明:

1. 调度器生命周期管理

这些方法用于启动、暂停、恢复和关闭调度器。

方法名说明void start()启动调度器,开始执行任务。调度器启动后,触发器会根据时间表触发任务。void startDelayed(int seconds)延迟指定秒数后启动调度器。void standby()暂停调度器,暂停后触发器不会触发任务,但任务和触发器的状态会保留。void shutdown()关闭调度器,停止所有任务执行。void shutdown(boolean waitForJobsToComplete)关闭调度器,并决定是否等待正在执行的任务完成。true 表示等待任务完成后再关闭。
2. 任务和触发器管理

这些方法用于添加、删除、暂停、恢复任务和触发器。

方法名说明void scheduleJob(JobDetail jobDetail, Trigger trigger)将任务(JobDetail)和触发器(Trigger)绑定,并添加到调度器中。Date scheduleJob(Trigger trigger)添加触发器,并返回触发器首次触发的时间。void addJob(JobDetail jobDetail, boolean replace)添加任务到调度器中。replace 为 true 时,如果任务已存在则替换。boolean deleteJob(JobKey jobKey)根据任务键(JobKey)删除任务。返回 true 表示删除成功。boolean deleteJobs(List<JobKey> jobKeys)批量删除任务。void pauseJob(JobKey jobKey)暂停指定任务。void pauseJobs(GroupMatcher<JobKey> matcher)暂停匹配指定组的所有任务。void resumeJob(JobKey jobKey)恢复指定任务。void resumeJobs(GroupMatcher<JobKey> matcher)恢复匹配指定组的所有任务。void pauseTrigger(TriggerKey triggerKey)暂停指定触发器。void pauseTriggers(GroupMatcher<TriggerKey> matcher)暂停匹配指定组的所有触发器。void resumeTrigger(TriggerKey triggerKey)恢复指定触发器。void resumeTriggers(GroupMatcher<TriggerKey> matcher)恢复匹配指定组的所有触发器。
3. 任务和触发器查询

这些方法用于查询调度器中的任务和触发器。

方法名说明JobDetail getJobDetail(JobKey jobKey)根据任务键(JobKey)获取任务详情(JobDetail)。List<? extends Trigger> getTriggersOfJob(JobKey jobKey)获取与指定任务关联的所有触发器。Trigger getTrigger(TriggerKey triggerKey)根据触发器键(TriggerKey)获取触发器。Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher)获取匹配指定组的所有任务键。Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher)获取匹配指定组的所有触发器键。SchedulerMetaData getMetaData()获取调度器的元数据,如调度器名称、版本、运行状态等。
4. 任务和触发器状态检查

这些方法用于检查任务和触发器的状态。

方法名说明boolean checkExists(JobKey jobKey)检查指定任务是否存在。boolean checkExists(TriggerKey triggerKey)检查指定触发器是否存在。boolean isJobGroupPaused(String groupName)检查指定任务组是否处于暂停状态。boolean isTriggerGroupPaused(String groupName)检查指定触发器组是否处于暂停状态。
5. 任务和触发器操作

这些方法用于直接操作任务和触发器。

方法名说明void triggerJob(JobKey jobKey)立即触发指定任务。void triggerJob(JobKey jobKey, JobDataMap data)立即触发指定任务,并传递额外的任务数据(JobDataMap)。void interrupt(JobKey jobKey)中断正在执行的任务。boolean unscheduleJob(TriggerKey triggerKey)移除指定触发器。boolean unscheduleJobs(List<TriggerKey> triggerKeys)批量移除触发器。
6. 调度器状态检查

这些方法用于检查调度器的运行状态。

方法名说明boolean isStarted()检查调度器是否已启动。boolean isShutdown()检查调度器是否已关闭。boolean isInStandbyMode()检查调度器是否处于暂停状态(standby mode)。
7. 调度器监听器管理

这些方法用于管理调度器的监听器。

方法名说明void addJobListener(JobListener listener)添加任务监听器。void addTriggerListener(TriggerListener listener)添加触发器监听器。void addSchedulerListener(SchedulerListener listener)添加调度器监听器。boolean removeJobListener(String name)移除指定名称的任务监听器。boolean removeTriggerListener(String name)移除指定名称的触发器监听器。
二、前置项目准备

本章使用的微服务项目博客链接如下,内有对应项目源码。

从零搭建微服务项目Base(第7章——微服务网关模块基础实现)-CSDN博客 blog.csdn.net/wlf2030/article/details/1456645271.从github下载对应项目解压,重命名为Pro1_3打开。

2.重命名模块为Pro1_3。

3.父工程pom.xml中<name>改成Pro1_3。

4.选择环境为dev,并重新加载maven。

5.启动nacos(安装和启动见第三章)。

6.进入nacos网页 配置管理->配置列表确认有这些yaml文件。

(如果不是一直跟着专栏做自然是没有的,需要看第四章的环境隔离和配置拉取,记得把父工程pom文件中namespace的值与nacos中命名空间生成的保持一致)

7.配置数据源,更换两服务的resources下yml文件的数据库配置,数据库sql见第一章数据库准备部分。

测试数据库连接 属性->点击数据源->测试连接->输入用户名密码

8.添加运行配置 服务->加号->运行配置类型->spring boot。

启动服务,测试接口。

(只有请求参数中包含token才放行)


三、Quartz模块配置

1.新建spring boot模块。

2.选择导入quartz模块(不导入也行,反正后面要换pom文件)

删除quartz模块下多余文件,只保留src以及pom.xml

父pom中加入quartz模块

查看maven结构是否如下,并选择配置文件为dev(如果不是在本专栏Base第0章有解决方法)

惯例在父pom文件中为新添加的模块配置不同环境下端口

删去quartz模块resources目录下原有配置文件,并创建application.yml内容如下:

server: port: @quartz.port@ spring: cloud: nacos: server-addr: localhost:8848 discovery: cluster-name: HZ namespace: @namespace@ application: name: quartz

将quartz模块下pom内容替换如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http:// .w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>cn.bit</groupId> <artifactId>Pro1_3</artifactId> <version>1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <artifactId>quartz</artifactId> <version>0.0.1-SNAPSHOT</version> <name>quartz</name> <description>quartz</description> <dependencies> <!-- quartz--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency> <!-- web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- nacos客户端依赖包 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
四、Demo测试

创建两个类,目录结构如下

DateJob内容如下:

package cn.bit.quartz.entity; import java.util.Date; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public class DateJob implements Job { @Override public void execute(JobExecutionContext arg0) throws JobExecutionException { System.out.println(new Date()); } }

QuartzController内容如下:

package cn.bit.quartz.controller; import cn.bit.quartz.entity.DateJob; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/quartz") public class QuartzController { @GetMapping("/test") public String test() throws SchedulerException { SchedulerFactory schedulerFactory = new StdSchedulerFactory(); Scheduler scheduler = schedulerFactory.getScheduler(); JobDetail job = JobBuilder.newJob(DateJob.class).withIdentity("job1", "group1").build(); Trigger trigger1 = TriggerBuilder.newTrigger() .withIdentity("trigger1", "group1").startNow() .withSchedule(SimpleScheduleBuilder .simpleSchedule() .withIntervalInSeconds(5) .withRepeatCount(5)) .build(); scheduler.scheduleJob(job, trigger1); scheduler.start(); return "ok"; } }

 QuartzApplication设置排除默认数据源设置

启动 Quartz服务,并访问对应接口。

控制台间隔五秒输出当前时间。


五、核心包构建

0.创建数据库对应表以及修改application.yml文件,这里是创建了一个叫db_quartz的库

server: port: @quartz.port@ spring: datasource: url: jdbc:mysql://localhost:3306/db_quartz?useSSL=false username: root password: 15947035212 driver-class-name: com.mysql.jdbc.Driver application: name: quartz mybatis: mapper-locations: classpath:mapper/*Mapper.xml type-aliases-package: cn.bit.pro1_1.core.mapper

数据库sql如下:

create table tb_task ( id int auto_increment primary key, task_name varchar(255) not null, task_group varchar(255) not null, type int not null, bean_name varchar(255) null, class_name varchar(255) null, path varchar(255) null, method_name varchar(255) null, params varchar(255) null, cron_expression varchar(255) not null, description text null, status int default 0 not null, result int null ); create table tb_task_log ( id int auto_increment primary key, task_id int not null, start_time datetime not null, execute_time varchar(255) not null, result tinyint not null, message varchar(255) not null, exception_info text null );

删去之前移除数据源的部分

数据源添加quartz对应的库并测试连接成功

数据库显示有三个

1.删去demo部分创建的两个文件,创建core包,其中包结构如下(实际即为将1_2章项目源码中的core部分粘贴进来,并适当修改,有能力的小伙伴可以直接下载上章源码进行对应修改,当然一步步按顺序做便于理解模块)

2.entity创建任务实体类和日志实体类,代码如下

package cn.bit.quartz.core.entity; import lombok.Data; @Data public class Task { private Integer id; private String taskName; private String taskGroup; private Integer type;//1、java类 2、Spring Bean 3、http请求 private String beanName;//bean名称 private String className;//java类名 private String path;//rest请求路径 private String methodName;//方法名 private String params;//方法参数 private String cronExpression;//cron表达式 private String description;//描述 private Integer status;//任务当前状态 private Integer result;//任务执行结果 } package cn.bit.quartz.core.entity; import lombok.Data; import java.util.Date; @Data public class TaskLog { private Integer id; private Integer taskId; private Date startTime; private String executeTime; private Integer result;//0失败 1成功 private String message;//日志信息 private String exceptionInfo;//异常信息 }

3.enums包下创建任务结果、任务状态、任务类型枚举,具体如下

package cn.bit.quartz.core.enums; import lombok.AllArgsConstructor; import lombok.Getter; @AllArgsConstructor @Getter public enum Result { FAIL(0,"失败"), SUCCESS(1,"成功") ; private final Integer code; private final String desc; } package cn.bit.quartz.core.enums; import lombok.AllArgsConstructor; import lombok.Getter; @AllArgsConstructor @Getter public enum TaskStatus { PAUSE(0, "已发布"), RUNNING(1, "运行中"); private final Integer code; private final String desc; } package cn.bit.quartz.core.enums; import lombok.AllArgsConstructor; import lombok.Getter; @AllArgsConstructor @Getter public enum TaskType { JAVA_CLASS(1,"java类"), SPRING_BEAN(2,"spring bean"), HTTP(3,"http"); private final Integer code; private final String desc; }

4.exception包下创建任务持久化异常和任务执行异常,具体如下

package cn.bit.quartz.core.exception; import lombok.Getter; @Getter public class TaskInvokeException extends Exception { private final Exception exception; public TaskInvokeException(String message,Exception exception) { super(message); this.exception = exception; } } package cn.bit.quartz.core.exception; import cn.bit.quartz.core.entity.Task; import lombok.Getter; @Getter public class TaskRepositoryException extends RuntimeException { private final Task task; public TaskRepositoryException(String message,Task task) { super(message); this.task = task; } }

5.mapper包下创建实体类的持久化接口

package cn.bit.quartz.core.mapper; ;import cn.bit.quartz.core.entity.TaskLog; import org.apache.ibatis.annotations.Mapper; import java.util.List; @Mapper public interface TaskLogMapper { int insert(TaskLog taskLog); List<TaskLog> selectByTaskId(int taskId); } package cn.bit.quartz.core.mapper; import cn.bit.quartz.core.entity.Task; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import java.util.List; @Mapper public interface TaskMapper { List<Task> selectAllTask(); int updateTaskInfo(@Param("task") Task task); int updateTaskStatus(@Param("task") Task task); int insertTask(@Param("task") Task task); int deleteTask(@Param("task") Task task); int setTaskResult(@Param("task") Task task); Task selectTaskByNameAndGroup(@Param("name") String name, @Param("group") String group ); }

6.以及resources下对应mapper.xml,注意目录结构,与application中的mybatis配置有关。

内容分别如下:

<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="cn.bit.quartz.core.mapper.TaskLogMapper"> <resultMap id="TaskLogResultMap" type="cn.bit.quartz.core.entity.TaskLog"> <id property="id" column="id"/> <result property="taskId" column="task_id"/> <result property="startTime" column="start_time"/> <result property="executeTime" column="execute_time"/> <result property="result" column="result"/> <result property="message" column="message"/> <result property="exceptionInfo" column="exception_info"/> </resultMap> <insert id="insert" parameterType="cn.bit.quartz.core.entity.TaskLog"> INSERT INTO tb_task_log(task_id, start_time, execute_time, result, message, exception_info) VALUES (#{taskId}, #{startTime}, #{executeTime}, #{result}, #{message}, #{exceptionInfo}) </insert> <select id="selectByTaskId" resultMap="TaskLogResultMap"> SELECT * FROM tb_task_log WHERE task_id = #{taskId} </select> </mapper> <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="cn.bit.quartz.core.mapper.TaskMapper"> <resultMap id="TaskResultMap" type="cn.bit.quartz.core.entity.Task"> <result property="id" column="id"/> <result property="taskName" column="task_name"/> <result property="taskGroup" column="task_group"/> <result property="type" column="type"/> <result property="beanName" column="bean_name"/> <result property="className" column="class_name"/> <result property="path" column="path"/> <result property="methodName" column="method_name"/> <result property="params" column="params"/> <result property="cronExpression" column="cron_expression"/> <result property="description" column="description"/> <result property="status" column="status"/> <result property="result" column="result"/> </resultMap> <insert id="insertTask"> INSERT INTO tb_task ( task_name, task_group, type, bean_name, class_name, path, method_name, params, cron_expression, description ) VALUES ( #{task.taskName}, #{task.taskGroup}, #{task.type}, #{task.beanName}, #{task.className}, #{task.path}, #{task.methodName}, #{task.params}, #{task.cronExpression}, #{task.description} ) </insert> <delete id="deleteTask"> delete from tb_task where task_name = #{task.taskName} and task_group = #{task.taskGroup}; </delete> <select id="selectAllTask" resultMap="TaskResultMap"> SELECT * FROM tb_task; </select> <select id="selectTaskByNameAndGroup" resultMap="TaskResultMap"> select * from tb_task where task_name = #{name} and task_group = #{group} </select> <update id="updateTaskInfo"> UPDATE tb_task SET type = #{task.type}, bean_name = #{task.beanName}, class_name = #{task.className}, path = #{task.path}, method_name = #{task.methodName}, params = #{task.params}, cron_expression = #{task.cronExpression}, description = #{task.description} WHERE task_name = #{task.taskName} and task_group = #{task.taskGroup}; </update> <update id="updateTaskStatus"> UPDATE tb_task SET status = #{task.status} WHERE task_name = #{task.taskName} and task_group = #{task.taskGroup}; </update> <update id="setTaskResult"> update tb_task set result = #{task.result} where task_name = #{task.taskName} and task_group = #{task.taskGroup}; </update> </mapper>

7.创建对应持久化服务

内容如下:

package cn.bit.quartz.core.service; import cn.bit.quartz.core.entity.Task; import java.util.List; public interface TaskService { List<Task> selectAllTask(); int updateTaskInfo(Task task); int updateTaskStatus(Task task); int insertTask(Task task); int deleteTask(Task task); int setTaskResult(Task task); Task selectTaskByNameAndGroup(String taskName, String groupName); } package cn.bit.quartz.core.service; import cn.bit.quartz.core.entity.TaskLog; import java.util.List; public interface TaskLogService { int insert(TaskLog taskLog); List<TaskLog> selectByTaskId(int taskId); } package cn.bit.quartz.core.service.impl; import cn.bit.quartz.core.entity.TaskLog; import cn.bit.quartz.core.mapper.TaskLogMapper; import cn.bit.quartz.core.service.TaskLogService; import lombok.AllArgsConstructor; import org.springframework.stereotype.Service; import java.util.List; @Service @AllArgsConstructor public class TaskLogServiceImpl implements TaskLogService { private final TaskLogMapper taskLogMapper; @Override public int insert(TaskLog taskLog) { return taskLogMapper.insert(taskLog); } @Override public List<TaskLog> selectByTaskId(int taskId) { return taskLogMapper.selectByTaskId(taskId); } } package cn.bit.quartz.core.service.impl; import cn.bit.quartz.core.entity.Task; import cn.bit.quartz.core.mapper.TaskMapper; import cn.bit.quartz.core.service.TaskService; import lombok.AllArgsConstructor; import org.springframework.stereotype.Service; import java.util.List; @Service @AllArgsConstructor public class TaskServiceImpl implements TaskService { private TaskMapper taskMapper; @Override public List<Task> selectAllTask() { return taskMapper.selectAllTask(); } @Override public int updateTaskInfo(Task task) { return taskMapper.updateTaskInfo(task); } @Override public int updateTaskStatus(Task task) { return taskMapper.updateTaskStatus(task); } @Override public int insertTask(Task task) { return taskMapper.insertTask(task); } @Override public int deleteTask(Task task) { return taskMapper.deleteTask(task); } @Override public int setTaskResult(Task task) { return taskMapper.setTaskResult(task); } @Override public Task selectTaskByNameAndGroup(String taskName, String groupName) { return taskMapper.selectTaskByNameAndGroup(taskName, groupName); } }

8.util包下创建springbean的工具类(bean反射会用到)

package cn.bit.quartz.core.util; import lombok.Getter; import lombok.NonNull; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; @Service public class SpringContextHolder implements ApplicationContextAware { @Getter private static ApplicationContext applicationContext = null; @Override public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException { SpringContextHolder.applicationContext = applicationContext; } }

9.handler包下创建不同任务类型反射类,以及对应工厂类

package cn.bit.quartz.core.handler; import cn.bit.quartz.core.entity.Task; import cn.bit.quartz.core.exception.TaskInvokeException; public interface ITaskHandler { void invoke(Task task) throws TaskInvokeException; } package cn.bit.quartz.core.handler.impl; import cn.bit.quartz.core.entity.Task; import cn.bit.quartz.core.enums.Result; import cn.bit.quartz.core.exception.TaskInvokeException; import cn.bit.quartz.core.handler.ITaskHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.ReflectionUtils; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @Slf4j @Component public class JavaClassTaskHandler implements ITaskHandler { @Override public void invoke(Task task) throws TaskInvokeException { try { Object target; Class<?> clazz; Method method; Result returnValue; clazz = Class.forName(task.getClassName()); target = clazz.newInstance(); if (task.getParams() == null || task.getParams().isEmpty()) { method = target.getClass().getDeclaredMethod(task.getMethodName()); ReflectionUtils.makeAccessible(method); returnValue = (Result) method.invoke(target); } else { method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class); ReflectionUtils.makeAccessible(method); returnValue = (Result) method.invoke(target, task.getParams()); } //判断业务是否执行成功 if (returnValue == null || Result.FAIL.equals(returnValue)) throw new TaskInvokeException("JavaClassTaskHandler方法执行失败",null); } catch (NoSuchMethodException e) { throw new TaskInvokeException("JavaClassTaskHandler找不到对应方法", e); } catch (InvocationTargetException | IllegalAccessException e) { throw new TaskInvokeException("JavaClassTaskHandler执行反射方法异常", e); } catch (ClassCastException e) { throw new TaskInvokeException("JavaClassTaskHandler方法返回值定义错误", e); } catch (ClassNotFoundException e) { throw new TaskInvokeException("JavaClassTaskHandler找不到对应类", e); } catch (InstantiationException e) { throw new TaskInvokeException("JavaClassTaskHandler实例化错误", e); } } } package cn.bit.quartz.core.handler.impl; import cn.bit.quartz.core.entity.Task; import cn.bit.quartz.core.enums.Result; import cn.bit.quartz.core.exception.TaskInvokeException; import cn.bit.quartz.core.handler.ITaskHandler; import cn.bit.quartz.core.util.SpringContextHolder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.stereotype.Component; import org.springframework.util.ReflectionUtils; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @Slf4j @Component public class SpringBeanTaskHandler implements ITaskHandler { @Override public void invoke(Task task) throws TaskInvokeException { try { Object target; Method method; Result returnValue; //上下文寻找对应bean target = SpringContextHolder.getApplicationContext().getBean(task.getBeanName()); //寻找对应方法 if(task.getParams()==null|| task.getParams().isEmpty()) { method = target.getClass().getDeclaredMethod(task.getMethodName()); ReflectionUtils.makeAccessible(method); returnValue = (Result) method.invoke(target); } else{ method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class); ReflectionUtils.makeAccessible(method); returnValue = (Result) method.invoke(target, task.getParams()); } //判断业务是否执行成功 if(returnValue==null || Result.FAIL.equals(returnValue)) throw new TaskInvokeException("SpringBeanTaskHandler方法执行失败", null); }catch (NoSuchBeanDefinitionException e){ throw new TaskInvokeException("SpringBeanTaskHandler找不到对应bean",e); } catch (NoSuchMethodException e) { throw new TaskInvokeException("SpringBeanTaskHandler找不到对应方法",e); } catch (InvocationTargetException | IllegalAccessException e) { throw new TaskInvokeException("SpringBeanTaskHandler执行反射方法异常",e); } catch (ClassCastException e) { throw new TaskInvokeException("SpringBeanTaskHandler方法返回值定义错误",e); } } } package cn.bit.quartz.core.handler; import cn.bit.quartz.core.entity.Task; import cn.bit.quartz.core.enums.TaskType; import cn.bit.quartz.core.handler.impl.JavaClassTaskHandler; import cn.bit.quartz.core.handler.impl.SpringBeanTaskHandler; import cn.bit.quartz.core.util.SpringContextHolder; import org.springframework.stereotype.Component; @Component public class TaskHandlerFactory { public static ITaskHandler getTaskHandler(Task task) { ITaskHandler taskHandler = null; if(TaskType.SPRING_BEAN.getCode().equals(task.getType())) { taskHandler = SpringContextHolder.getApplicationContext().getBean(SpringBeanTaskHandler.class); } if(TaskType.JAVA_CLASS.getCode().equals(task.getType())) { taskHandler = SpringContextHolder.getApplicationContext().getBean(JavaClassTaskHandler.class); } return taskHandler; } }

10.events包下创建任务调用事件,监听者和发布者,其中发布者实现Job接口

package cn.bit.quartz.core.events.event; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.ToString; @ToString @Getter @AllArgsConstructor public class TaskInvokeEvent { private final String taskName; private final String taskGroup; } package cn.bit.quartz.core.events.listener; import cn.bit.quartz.core.entity.Task; import cn.bit.quartz.core.entity.TaskLog; import cn.bit.quartz.core.enums.Result; import cn.bit.quartz.core.events.event.TaskInvokeEvent; import cn.bit.quartz.core.exception.TaskInvokeException; import cn.bit.quartz.core.handler.ITaskHandler; import cn.bit.quartz.core.handler.TaskHandlerFactory; import cn.bit.quartz.core.service.TaskLogService; import cn.bit.quartz.core.service.TaskService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.util.Date; @Slf4j @AllArgsConstructor @Component public class TaskInvokeListener { private final TaskLogService taskLogService; private final TaskService taskService; @Async @Order @EventListener(TaskInvokeEvent.class) public void notifyTaskInvoke(TaskInvokeEvent event) { //从数据库中拿取任务 Task task = taskService.selectTaskByNameAndGroup(event.getTaskName(), event.getTaskGroup()); log.info("任务执行事件监听,准备执行任务{}",task); ITaskHandler handler = TaskHandlerFactory.getTaskHandler(task); long startTime = System.currentTimeMillis(); TaskLog taskLog = new TaskLog(); taskLog.setTaskId(task.getId()); taskLog.setStartTime(new Date()); boolean success = true; try { handler.invoke(task); } catch (TaskInvokeException e) { log.error("{},Task:{}", e.getMessage(),task); success = false; taskLog.setMessage(e.getMessage()); if(e.getException()!=null){ taskLog.setExceptionInfo(e.getException().getCause().toString()); e.getException().printStackTrace(); } } if(success) { taskLog.setMessage("执行成功"); taskLog.setResult(Result.SUCCESS.getCode()); task.setResult(Result.SUCCESS.getCode()); taskService.setTaskResult(task); } else { taskLog.setResult(Result.FAIL.getCode()); task.setResult(Result.FAIL.getCode()); taskService.setTaskResult(task); } long endTime = System.currentTimeMillis(); taskLog.setExecuteTime(String.valueOf(endTime-startTime)); taskLogService.insert(taskLog); } } package cn.bit.quartz.core.events.publisher; import cn.bit.quartz.core.entity.Task; import cn.bit.quartz.core.events.event.TaskInvokeEvent; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; @AllArgsConstructor @Slf4j @Component public class TaskInvokePublisher implements Job { private final ApplicationEventPublisher publisher; @Override public void execute(JobExecutionContext jobExecutionContext){ Task task = (Task) jobExecutionContext.getJobDetail().getJobDataMap().get("task"); //发布事件异步执行任务 TaskInvokeEvent event =new TaskInvokeEvent(task.getTaskName(),task.getTaskGroup()); publisher.publishEvent(event); log.info("任务执行事件发布:{}",event); } }

11.mvc下创建对应任务控制类

package cn.bit.quartz.core.mvc; import cn.bit.quartz.core.entity.Task; import cn.bit.quartz.core.events.publisher.TaskInvokePublisher; import lombok.AllArgsConstructor; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.quartz.*; import org.springframework.stereotype.Service; @Slf4j @Service @AllArgsConstructor public class TaskPool { public static JobKey getJobKey(@NonNull Task task) { return JobKey.jobKey(task.getTaskName(),task.getTaskGroup()); } public static TriggerKey getTriggerKey(@NonNull Task task) { return TriggerKey.triggerKey(task.getTaskName(),task.getTaskGroup()); } /** * 任务池添加任务 * @param task * @param scheduler * @throws SchedulerException */ public void addTask(@NonNull Task task,Scheduler scheduler) throws SchedulerException { JobKey jobKey = getJobKey(task); TriggerKey triggerKey = getTriggerKey(task); JobDetail jobDetail = JobBuilder.newJob(TaskInvokePublisher.class).withIdentity(jobKey).build(); jobDetail.getJobDataMap().put("task",task); CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression()); CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder) .build(); scheduler.scheduleJob(jobDetail, trigger); } /** * 任务池暂停并移除任务 * @param task * @param scheduler * @throws SchedulerException */ public void pauseTask(@NonNull Task task,Scheduler scheduler) throws SchedulerException { scheduler.pauseJob(getJobKey(task)); scheduler.deleteJob(getJobKey(task)); } } package cn.bit.quartz.core.mvc; import cn.bit.quartz.core.entity.Task; import cn.bit.quartz.core.enums.TaskStatus; import cn.bit.quartz.core.exception.TaskRepositoryException; import cn.bit.quartz.core.service.TaskService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.List; @Slf4j @Service @AllArgsConstructor public class TaskManger { private final Scheduler scheduler; private final TaskService taskService; private final TaskPool taskPool; /** * 从数据库中反序列化任务数据,保证服务器重启后恢复任务池状态 * @throws SchedulerException */ @PostConstruct public void init() throws SchedulerException { log.info("TaskManager初始化开始..."); List<Task> tasks = taskService.selectAllTask(); if(tasks != null && !tasks.isEmpty()) { for (Task task : tasks) { if(TaskStatus.RUNNING.getCode().equals(task.getStatus())) taskPool.addTask(task,scheduler); } log.info("初始化加载{}项任务", tasks.size()); } log.info("TaskManager初始化结束..."); } /** * 添加暂停且未被持久化的新任务 * @param task * @throws SchedulerException */ public void addTask(Task task) throws SchedulerException { Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup()); if(temp != null) throw new TaskRepositoryException("存在相同任务组和任务名任务",task); if(!TaskStatus.PAUSE.getCode().equals(task.getStatus())) throw new TaskRepositoryException("只能添加暂停的任务",task); taskService.insertTask(task); log.info("添加任务{}", task); } /** * 在任务暂停时更新任务信息 * @param task * @throws SchedulerException */ public void updateTask(Task task) throws SchedulerException { Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup()); if(temp == null) throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task); if(!TaskStatus.PAUSE.getCode().equals(temp.getStatus())) throw new TaskRepositoryException("只能暂停时更新任务",task); taskService.updateTaskInfo(task); log.info("更新任务{}", task); } /** * 启动暂停中任务 * @param task 只使用name和group字段 * @throws SchedulerException */ public void startTask(Task task) throws SchedulerException { Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup()); if(temp == null) throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task); if(!TaskStatus.PAUSE.getCode().equals(temp.getStatus())) throw new TaskRepositoryException("只能启动暂停中任务",task); taskPool.addTask(temp,scheduler); //添加任务池未有异常时持久化数据 temp.setStatus(TaskStatus.RUNNING.getCode()); taskService.updateTaskStatus(temp); log.info("启动任务{}", temp); } /** * 暂停运行中任务 * @param task 只使用name和group字段 * @throws SchedulerException */ public void pauseTask(Task task) throws SchedulerException { Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup()); if(temp == null) throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task); if(!TaskStatus.RUNNING.getCode().equals(temp.getStatus())) throw new TaskRepositoryException("只能暂停运行中任务",task); taskPool.pauseTask(temp,scheduler); //添加任务池未有异常时持久化数据 temp.setStatus(TaskStatus.PAUSE.getCode()); taskService.updateTaskStatus(temp); log.info("暂停任务{}", temp); } /** * 暂停暂停中任务 * @param task 只使用name和group字段 * @throws SchedulerException */ public void deleteTask(Task task) throws SchedulerException { Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup()); if(temp == null) throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task); if(!TaskStatus.PAUSE.getCode().equals(temp.getStatus())) throw new TaskRepositoryException("只能删除暂停中任务",task); taskService.deleteTask(temp); log.info("删除任务{}", temp); } }

12.以core同级创建controller包作为quartz服务对外接口,并创建类

package cn.bit.quartz.controller; import cn.bit.quartz.core.entity.Task; import cn.bit.quartz.core.mvc.TaskManger; import lombok.AllArgsConstructor; import org.quartz.SchedulerException; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/quartz") @AllArgsConstructor public class QuartzController { private TaskManger taskManger; @PostMapping("/add") public String add(@RequestBody Task task) throws SchedulerException { taskManger.addTask(task); return "success"; } @PutMapping("/update") public String update(@RequestBody Task task) throws SchedulerException { taskManger.updateTask(task); return "success"; } @PutMapping("/start") public String start(@RequestBody Task task) throws SchedulerException { taskManger.startTask(task); return "success"; } @PutMapping("/pause") public String pause(@RequestBody Task task) throws SchedulerException { taskManger.pauseTask(task); return "success"; } @DeleteMapping("/delete") public String delete(@RequestBody Task task) throws SchedulerException { taskManger.deleteTask(task); return "success"; } }
六、单模块测试

在controller包下创建测试类

package cn.bit.quartz.controller; import cn.bit.quartz.core.enums.Result; import org.springframework.stereotype.Component; import java.util.Date; @Component("test") public class Test { public Result test(String param) { System.out.println("test "+param+" "+new Date()); return Result.SUCCESS; } }

单独启动quartz服务,分别使用postman依次发出下面请求

{ "taskName": "task1", "taskGroup": "group1", "type": 2, "beanName": "test", "methodName": "test", "params": "test1", "cronExpression": "*/5 * * * * ?", "status": 0 }

控制台输出:

{ "taskName": "task1", "taskGroup": "group1" }

控制台每5s执行一次任务

{ "taskName": "task1", "taskGroup": "group1" }

控制台显示任务被暂停

(从下面开始则为将quartz整合至微服务项目中,为组件复用,会有很多模块间的导入,但同时又需要考虑到spring上下文不互通导致无法获取其他模块的bean,本文采用的方式是将公共类抽取至common模块中,其他模块均导入common模块,选择性导入common中bean,但同时可能各模块core中的实体类不在本模块中,有利有弊,以及模块的导入涵盖了Base章节的大部分内容,有一定门槛建议先了解Base章节中内容再继续,当然如果只是想知道springboot中quartz模块的使用到这里就可以不用往下看了)


七、服务整合 1.服务注册

quartz模块创建bootstrap.yml内容如下:

spring: application: name: quartz cloud: nacos: server-addr: localhost:8848 discovery: cluster-name: HZ namespace: @namespace@

添加后启动在nacos中能够看到服务被注册

2.网关路由

修改gateway配置文件内容如下

server: port: @gateway.port@ spring: application: name: gateway cloud: nacos: server-addr: localhost:8848 discovery: namespace: @namespace@ gateway: routes: - id: user-service uri: lb://user-service predicates: - name: Path args: _genkey_0: /user/** filters: - name: AddRequestHeader args: name: source value: request user from gateway - id: order-service uri: lb://order-service predicates: - name: Path args: _genkey_0: /order/** filters: - name: AddRequestHeader args: name: source value: request order from gateway - id: quartz uri: lb://quartz predicates: - name: Path args: _genkey_0: /quartz/**

启动网关服务和quartz服务后,通过网关端口调用接口(网关有个简单的鉴权需要token = admin放行)

3.全局异常处理

common模块创建exception包用于定义全局异常,创建业务异常内容如下:

package cn.bit mon.exception; import lombok.NoArgsConstructor; @NoArgsConstructor public class BizException extends RuntimeException { public BizException(String message) { super(message); } public BizException(Throwable cause) { super(cause); } public BizException(String message, Throwable cause) { super(message, cause); } public BizException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } }

各模块的业务逻辑异常应当由各模块自己处理,为方便复用,处理类放于common模块下,common导入web依赖

<dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> </dependency>

在common模块新建handler包,以及异常处理类,内容如下,此处将Pro2-1章的jsr异常也提前添加。

package cn.bit mon.handler; import cn.bit mon.exception.BizException; import cn.bit mon.pojo.vo.R; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.validation.BindException; import org.springframework.validation.FieldError; import org.springframework.web.bind.MethodArgumentNotValidException; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestControllerAdvice; import java.nio.file.AccessDeniedException; import java.util.List; @Slf4j @RestControllerAdvice public class GlobalExceptionHandler { /** * 全局异常. * @param e the e * @return R */ @ExceptionHandler(Exception.class) @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) public R handleGlobalException(Exception e) { log.error("全局异常信息 ex={}", e.getMessage(), e); return R.failed(e.getLocalizedMessage()); } /** * AccessDeniedException * @param e the e * @return R */ @ExceptionHandler(AccessDeniedException.class) @ResponseStatus(HttpStatus.FORBIDDEN) public R handleAccessDeniedException(AccessDeniedException e) { log.error("拒绝授权异常信息 ex={}", e.getLocalizedMessage(),e); return R.failed(e.getLocalizedMessage()); } /** * 业务处理类 * @param e the e * @return R */ @ExceptionHandler({ BizException.class }) @ResponseStatus(HttpStatus.BAD_REQUEST) public R bizExceptionHandler(BizException e) { log.warn("业务处理异常,ex = {}", e.getMessage()); return R.failed(e.getMessage()); } /** * validation Exception * @param e the e * @return R */ @ExceptionHandler({ MethodArgumentNotValidException.class}) @ResponseStatus(HttpStatus.BAD_REQUEST) public R handleBodyValidException(MethodArgumentNotValidException e) { List<FieldError> fieldErrors = e.getBindingResult().getFieldErrors(); StringBuilder errorMsg = new StringBuilder(); fieldErrors.forEach(fieldError -> {errorMsg.append(fieldError.getField()).append(":").append(fieldError.getDefaultMessage()).append(" ");}); log.warn("参数绑定异常,ex = {}",errorMsg); return R.failed(errorMsg.toString()); } /** * validation Exception (以form-data形式传参) * @param e the e * @return R */ @ExceptionHandler({ BindException.class}) @ResponseStatus(HttpStatus.BAD_REQUEST) public R bindExceptionHandler(BindException e) { List<FieldError> fieldErrors = e.getBindingResult().getFieldErrors(); StringBuilder errorMsg = new StringBuilder(); fieldErrors.forEach(fieldError -> {errorMsg.append(fieldError.getField()).append(":").append(fieldError.getDefaultMessage()).append("\n");}); log.warn("参数绑定异常(form-data),ex = {}",errorMsg); return R.failed(errorMsg.toString()); } }  4.统一响应体格式以及异常统一

quartz中添加项目中common模块的依赖,从中拿取公共类

<dependency> <groupId>cn.bit</groupId> <artifactId>common</artifactId> <version>1.0-SNAPSHOT</version> </dependency>

修改quartz模块controller内容如下:

package cn.bit.quartz.controller; import cn.bit mon.exception.BizException; import cn.bit mon.pojo.vo.R; import cn.bit.quartz.core.entity.Task; import cn.bit.quartz.core.exception.TaskRepositoryException; import cn.bit.quartz.core.mvc.TaskManger; import lombok.AllArgsConstructor; import org.quartz.SchedulerException; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/quartz") @AllArgsConstructor public class QuartzController { private TaskManger taskManger; @PostMapping("/add") public R add(@RequestBody Task task){ try { taskManger.addTask(task); } catch (SchedulerException | TaskRepositoryException e) { throw new BizException(e); } return R.ok("添加任务成功"); } @PutMapping("/update") public R update(@RequestBody Task task){ try { taskManger.updateTask(task); }catch (SchedulerException | TaskRepositoryException e) { throw new BizException(e.getMessage()); } return R.ok("任务更新成功"); } @PutMapping("/start") public R start(@RequestBody Task task){ try { taskManger.startTask(task); }catch (SchedulerException | TaskRepositoryException e) { throw new BizException(e.getMessage()); } return R.ok("任务启动成功"); } @PutMapping("/pause") public R pause(@RequestBody Task task){ try { taskManger.pauseTask(task); } catch (SchedulerException | TaskRepositoryException e) { throw new BizException(e.getMessage()); } return R.ok("任务暂停成功"); } @DeleteMapping("/delete") public R delete(@RequestBody Task task){ try { taskManger.deleteTask(task); } catch (SchedulerException | TaskRepositoryException e) { throw new BizException(e.getMessage()); } return R.ok("任务删除成功"); } }

并在quartz模块的application中引入异常处理类的bean

package cn.bit.quartz; import cn.bit mon.handler.GlobalExceptionHandler; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Import; @SpringBootApplication @Import(GlobalExceptionHandler.class) public class QuartzApplication { public static void main(String[] args) { SpringApplication.run(QuartzApplication.class, args); } }
八、整合测试

启动网关和quartz模块,postman测试接口

访问网关接口,不加admin,返回401未鉴权错误码

尝试添加相同任务

启动暂停任务

启动运行任务


最后:

本来是想再添加对应的feignclient但实在想不出有什么业务需要调用order-service或user-service来开启定时任务,直接调用quartz模块不是来的更快。至此,关于quartz定时模块的研究到此正式完结撒花 ,基本框架不会变更了,后续仅做微调,以及最后希望大家多多支持!

标签:

从零搭建微服务项目Pro(第1-3章——Quartz定时任务模块整合)由讯客互联软件开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“从零搭建微服务项目Pro(第1-3章——Quartz定时任务模块整合)