主页 > 电脑硬件  > 

Springboot使用Redis发布订阅自动更新缓存数据源

Springboot使用Redis发布订阅自动更新缓存数据源
背景

当项目有很多数据源的时候,通常会在启动的时候就把数据源连接加载缓存上,当数据源进行变更后如何自动实时将缓存的数据源进行更新呢?如果是单个项目直接调接口方法就行了,但是涉及到分布式多个系统呢?

解决方案:

使用Redis轻量级消息队列,它可以实现实时通知,实时状态更新等功能,配合AOP实现自动更新数据源状态。

下面结合代码写一个使用示例:

1.首先创建数据源对象

import cn.hutool.core.collection.CollectionUtil; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Getter; import lombok.Setter; import lombok.ToString; import lombok.experimental.Accessors; import org.apache mons.lang3.StringUtils; import java.io.Serializable; import java.util.ArrayList; import java.util.Date; import java.util.List; /** * * @author ws * @since 2022-08-12 */ @Getter @Setter @ToString @Accessors(chain = true) @TableName("ed_datasource_info") public class DatasourceInfo implements Serializable { private static final long serialVersionUID = 1L; @TableId(value = "id", type = IdType.AUTO) private Integer id; /** * 数据源编码 */ @TableField("datasource_code") private String datasourceCode; /** * 数据源名称 */ @TableField("datasource_name") private String datasourceName; /** * 数据源类型 */ @TableField("datasource_type") private String datasourceType; /** * 类型 0:数据库 1:Rest-api */ @TableField("type") private Integer type; /** * 创建人 */ @TableField("creator") private String creator; /** * 模式 */ @TableField("schema_name") private String schemaName; @TableField("create_time") private Date createTime; @TableField("update_time") private Date updateTime; /** * 数据源连接信息 */ @TableField("link_json") private String linkJson; }

2.初始化启动加载数据源

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.sztech mon.constant.DataSourceTypeEnum; import com.sztech.entity.DatasourceInfo; import com.sztech.service.DatasourceInfoService; import lombok.extern.slf4j.Slf4j; import org.apache mons.lang3.StringUtils; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @Slf4j @Component public class DataSourceRecovery implements InitializingBean { @Resource private DatasourceInfoService datasourceInfoService; @Override public void afterPropertiesSet() throws Exception { refresh(); } private void refresh() throws Exception{ this.refresh(null); } public void refresh(String sourceCode){ QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("type", DataSourceTypeEnum.DB.getKey()); if(StringUtils.isNotBlank(sourceCode)){ queryWrapper.eq("datasource_code",sourceCode); } List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper); if(CollectionUtils.isEmpty(list)){ return; } CountDownLatch countDownLatch = new CountDownLatch(list.size()); for(DatasourceInfo datasourceInfo : list){ new Thread(new ReadloadThread(datasourceInfo, countDownLatch)).start(); } try { countDownLatch.await(1,TimeUnit.MINUTES); } catch (InterruptedException e) { log.error("数据源加载等待超时",e); } } /** * 多线程加载数据源,提高启动速度 */ static class ReadloadThread implements Runnable { private DatasourceInfo datasourceInfo; private CountDownLatch countDownLatch; public ReadloadThread() { } public ReadloadThread(DatasourceInfo datasourceInfo,CountDownLatch countDownLatch) { this.datasourceInfo = datasourceInfo; this.countDownLatch = countDownLatch; } @Override public void run() { try { DataSourceContext.setClientMap(datasourceInfo); DataSourceContext.setConfigMap(datasourceInfo.getDatasourceCode(),datasourceInfo); }catch (Exception e){ log.error("datasource:{},加载失败",datasourceInfo.getDatasourceCode(),e); }finally { countDownLatch.countDown(); } } } }

3.创建DataSourceContext,用于数据源缓存数据源连接

import com.sztech.core.tool.DBTool; import com.sztech.entity.DatasourceInfo; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * User: wangsheng * Date: 2022-02-11 * Time: 14:05 */ public class DataSourceContext { /** * 客户端缓存 */ private final static Map<String, IClient> clientMap = new ConcurrentHashMap<>(); /** * 数据源配置缓存 */ private final static Map<String, DatasourceInfo> configMap = new ConcurrentHashMap<>(); public static void setClientMap(DatasourceInfo datasourceInfo) { if(clientMap.containsKey(datasourceInfo.getDatasourceCode())){ try { clientMap.get(datasourceInfo.getDatasourceCode()).close(); }catch (Exception ignored){ } } clientMap.put(datasourceInfo.getDatasourceCode(), DBTool.buildClient(datasourceInfo)); } public static void setConfigMap(String key, DatasourceInfo datasourceInfo) { configMap.put(key, datasourceInfo); } public static void removeClientMap(String key) { if(clientMap.containsKey(key)){ try { clientMap.get(key).close(); }catch (Exception ignored){ } } clientMap.remove(key); } public static void removeConfigMap(String key) { configMap.remove(key); } public static IClient getClientMap(String key) { IClient client = clientMap.get(key); if(null == client){ throw new RuntimeException(String.format("数据源编码:[%s]不存在或被删除...", key)); } return client; } public static DatasourceInfo getConfigMap(String key) { DatasourceInfo datasourceInfo = configMap.get(key); if(null == datasourceInfo){ throw new RuntimeException(String.format("数据源编码:[%s]不存在或被删除...", key)); } return datasourceInfo; } } package com.sztech.core.tool; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.aliyun.odps.Instance; import com.sztech mon.constant.ResultEnum; import com.sztech mon.exception.BizException; import com.sztech mon.utils.ReflectionUtils; import com.sztech mon.utils.SpringUtils; import com.sztech mon.utils.ThreadPoolUtil; import com.sztech.core.datasource.DataSourceContext; import com.sztech.core.datasource.IClient; import com.sztech.core.datasource.rdbms.RdbmsConfig; import com.sztech.entity.*; import com.sztech.pojo.dto.ColumnDto; import com.sztech.pojo.dto.QueryTableDto; import com.sztech.pojo.dto.TableDto; import com.sztech.pojo.node.PartitionColumn; import com.sztech.pojo.vo.*; import com.sztech.service.CreateTableLogService; import lombok.extern.slf4j.Slf4j; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import java.sql.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; /** * Description: * User: wangsheng * Date: 2022-08-12 * Time: 16:59 */ @Slf4j public class DBTool { /** * 建立客户端 */ public static IClient buildClient(DatasourceInfo datasourceInfo) { IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class); return client.open(datasourceInfo); } /** * 测试数据源 * * @return */ public static boolean testSource(DatasourceInfo datasourceInfo) { IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class); return client.testSource(datasourceInfo); } public static List<String> getSchemas(DatasourceInfo datasourceInfo) { List<String> schemas = new ArrayList<>(); Connection conn = null; try { IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class); Class.forName(client.driverName()); String linkJson = datasourceInfo.getLinkJson(); RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class); conn = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getDecodePassword()); DatabaseMetaData metadata = conn.getMetaData(); try (ResultSet resultSet = metadata.getSchemas()) { while (resultSet.next()) { String schemaName = resultSet.getString("TABLE_SCHEM"); schemas.add(schemaName); } } } catch (SQLException e) { throw new RuntimeException(e); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } finally { if (conn != null) { try { conn.close(); } catch (SQLException ex) { ex.printStackTrace(); } } } return schemas; } /** * 获取驱动名称 */ public static String getDriverName(String datasourceType) { IClient client = ReflectionUtils.getInstanceFromCache(datasourceType, "type", IClient.class); return client.driverName(); } /** * 获取表中列信息 */ public static List<ColumnDto> getColumns(String datasourceCode, String tableName) { return DataSourceContext.getClientMap(datasourceCode).getColumns(tableName); } /** * 获取表中分区列信息 */ public static List<String> getPartitionColumns(String datasourceCode, String tableName) { return DataSourceContext.getClientMap(datasourceCode).getPartitionColumns(tableName); } /** * 获取表信息 */ public static List<String> getTableNames(String datasourceCode, String tableNameLike) { return DataSourceContext.getClientMap(datasourceCode).getTableNames(tableNameLike); } /** * 获取表信息 */ public static List<TableDto> getTables(String datasourceCode) { return DataSourceContext.getClientMap(datasourceCode).getTables(); } /** * 获取单个表信息 */ public static TableDto getTableByName(String datasourceCode, String tableName) { return DataSourceContext.getClientMap(datasourceCode).getTableByName(tableName); } /** * 获取单个表信息(创建时间,字段数) */ public static TableDto getTableField(String datasourceCode, String tableName) { return DataSourceContext.getClientMap(datasourceCode).getTableField(tableName); } /** * 获取表信息(获取创建时间) * * @param dto * @return */ public static TableInfoVo getTableData(QueryTableDto dto) { IClient client = DataSourceContext.getClientMap(dto.getDataSourceCode()); return client.getTableInfo(dto.getTableName()); } /** * 根据字段type建表 */ public static void createTableByColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) { IClient client = DataSourceContext.getClientMap(datasourceCode); List<String> sqls = client.buildTableSql(columnDtos, tableName, true); log.info("执行建表语句为:" + JSON.toJSONString(sqls)); sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>())); } /** * 根据字段type建表 */ public static void createTableByNotTransformedColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) { IClient client = DataSourceContext.getClientMap(datasourceCode); List<String> sqls = client.buildTableSql(columnDtos, tableName, false); log.info("执行建表语句为:" + JSON.toJSONString(sqls)); sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>())); } /** * 创建索引 * 注: oracle 索引名在整个库里必须唯一 否则建立失败 * * @param datasourceCode 数据源编码 * @param tableName 表名 * @param filedNames filed1,filed2... * @param unique 唯一 */ public static void createIndex(String datasourceCode, String tableName, String filedNames, Boolean unique) { DataSourceContext.getClientMap(datasourceCode).createIndex(tableName, filedNames, unique); } /** * sql校验 * * @param datasourceCode * @param sql * @param sourceType * @return */ public static Map<String, Object> checkSql(String datasourceCode, String sql, String sourceType) { IClient client = DataSourceContext.getClientMap(datasourceCode); return client.checkSql(sql, sourceType); } /** * 根据sql创建表 * * @param datasourceCode * @param sql */ public static void createTableWithSql(String datasourceCode, String sql) { IClient client = DataSourceContext.getClientMap(datasourceCode); log.info("执行建表语句为:" + JSON.toJSONString(sql)); client.executeCommandSyn(sql, new HashMap<>()); // DataSourceContext.getClientMap(datasourceCode).createTableWithSql(sql); } /** * 删除表 * * @param datasourceCode * @param tableName */ public static void dropTable(String datasourceCode, String tableName) { DataSourceContext.getClientMap(datasourceCode).dropTable(tableName); } /** * 单表查询数据 */ public static List<Map<String, Object>> selectDataFromTable(String datasourceCode, List<DataTableColumn> columns, String tableName, String search, Integer limit) { IClient client = DataSourceContext.getClientMap(datasourceCode); // 获取查询语句 String selectSql = client.getSelectSql(columns, tableName, search, limit); log.info("执行语句:" + selectSql); return client.selectDataFromTable(selectSql, null); } /** * 单表查询数据 */ public static List<Map<String, Object>> selectFromTable(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) { IClient client = DataSourceContext.getClientMap(datasourceCode); // 获取查询语句 String selectSql = client.getFormSelectSql(columns, searchColumns, tableName, search, pageNum, pageSize, params); log.info("执行语句:" + selectSql); return client.selectDataFromTable(selectSql, params); } /** * 单表查询数据 */ public static List<Map<String, Object>> selectFromForBackUp(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) { IClient client = DataSourceContext.getClientMap(datasourceCode); // 获取查询语句 String selectSql = client.selectFromForBackUp(columns, searchColumns, tableName, search, pageNum, pageSize, params); log.info("执行语句:" + selectSql); return client.selectDataFromTable(selectSql, params); } /** * 单表查询数据 */ public static List<Map<String, Object>> selectFromFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) { IClient client = DataSourceContext.getClientMap(datasourceCode); // 获取查询语句 String selectSql = client.getFormSelectSqlForFile(columns, searchColumns, tableName, search, pageNum, pageSize, params); log.info("执行语句:" + selectSql); return client.selectDataFromTable(selectSql, params); } /** * 查询单表是否存在文件名 */ public static List<Map<String, Object>> getExistOldName(String datasourceCode, String tableName, String search) { IClient client = DataSourceContext.getClientMap(datasourceCode); // 获取查询语句 String selectSql = client.getExistOldName( tableName, search); log.info("执行语句:" + selectSql); return client.selectDataFromTable(selectSql, null); } /** * 单表查询数据(查询归集表专门使用) */ public static List<Map<String, Object>> selectCollectTable(CollectConditionVo vo) { IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode()); // 获取查询语句 String selectSql = client.getCollectTable(vo); log.info("执行语句:" + selectSql); return client.selectDataFromTable(selectSql, vo.getParams()); } /** * 单表查询数据量 */ public static Map<String, Object> getFormCount(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) { IClient client = DataSourceContext.getClientMap(datasourceCode); // 获取查询语句 String selectSql = client.getCountSql(columns, searchColumns, tableName, search, params); log.info("执行语句:" + selectSql); return client.getCount(selectSql, params); } /** * 查询区县库表的数据量 */ public static Map<String, Object> getCountryCount(String datasourceCode, String tableName, MapSqlParameterSource params) { IClient client = DataSourceContext.getClientMap(datasourceCode); // 获取查询语句 String selectSql ="select count(1) as count from "+tableName; log.info("执行语句:" + selectSql); return client.getCount(selectSql, params); } public static Map<String, Object> getFormCountForFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) { IClient client = DataSourceContext.getClientMap(datasourceCode); // 获取查询语句 String selectSql = client.getCountSqlForFile(columns, searchColumns, tableName, search, params); log.info("执行语句:" + selectSql); return client.getCount(selectSql, params); } /** * 查询表数据量 */ public static Long getTableRows(String datasourceCode, String tableName) { IClient client = DataSourceContext.getClientMap(datasourceCode); return client.getTableRows(tableName); } /** * 查询表对应分区数据量 */ public static Long getTablePartitionRows(String datasourceCode, String tableName, List<PartitionColumn> partitionColumns) { IClient client = DataSourceContext.getClientMap(datasourceCode); return client.getTablePartitionRows(tableName, partitionColumns); } /** * 查询表数据量 */ public static Integer getTablePhysicalSize(String datasourceCode, String tableName) { IClient client = DataSourceContext.getClientMap(datasourceCode); return client.getPhysicalSize(tableName); } /** * 获取表最大值 * * @param datasourceCode 数据源编码 * @param tableName 表名 * @param incColumnName 自增列名 * @return {@link Integer} */ public static Object getMaxValue(String datasourceCode, String tableName, String incColumnName, String condition) { return DataSourceContext.getClientMap(datasourceCode).getMaxValue(tableName, incColumnName, condition); } public static Object getMaxValue(String datasourceCode, String schema, String tableName, String incColumnName, String condition) { return DataSourceContext.getClientMap(datasourceCode).getMaxValue(schema, tableName, incColumnName, condition); } public static Object getMaxTime(String datasourceCode, String schema, String tableName, String incColumnName, String tongId,String condition) { return DataSourceContext.getClientMap(datasourceCode).getMaxTime(schema, tableName, incColumnName,tongId, condition); } /** * 字段存在 * * @param datasourceCode 数据源编码 * @param tableName 表名 * @param fieldName 字段名 * @return {@link Boolean} */ public static Boolean fieldExist(String datasourceCode, String tableName, String fieldName) { List<ColumnDto> columns = getColumns(datasourceCode, tableName); return columns.stream().anyMatch(s -> s.getName().equalsIgnoreCase(fieldName)); } /** * 数据预览 获取前十条 * * @return */ public static String dataView(String datasourceCode, String tableName, String condition) { return DataSourceContext.getClientMap(datasourceCode).dataView(tableName, condition); } /** * 创建分区临时表 * odps适用 */ public static void createPartitionedTableByColumns(List<ColumnDto> columnDtos, String tableName, String tableComment, String partitionedField, String datasourceCode) { DataSourceContext.getClientMap(datasourceCode).createPartitionedTableByColumns(columnDtos, tableName, tableComment, partitionedField); } /** * 同步执行命令 */ public static void executeCommandSyn(String datasourceCode, String command, Map<String, Object> params) { DataSourceContext.getClientMap(datasourceCode).executeCommandSyn(command, params); } /** * 异步执行命令 * odps适用 */ public static Instance executeCommandASyn(String datasourceCode, String command, Map<String, Object> params) { return DataSourceContext.getClientMap(datasourceCode).executeCommandASyn(command, params); } /** * 是否有导出权限 * odps适用 * * @param datasourceCode 数据源编码 * @param tableName 表名 * @return {@link Boolean} */ public static Boolean exportEnable(String datasourceCode, String tableName) { return DataSourceContext.getClientMap(datasourceCode).exportEnable(tableName); } /** * 插入单条数据 * * @param datasourceCode * @param vo * @return */ public static Integer insert(String datasourceCode, FormTableVo vo) { return DataSourceContext.getClientMap(datasourceCode).insert(vo); } /** * 批量插入数据 * * @param datasourceCode * @param vo * @return */ public static Integer[] betchInsert(String datasourceCode, FormTableVo vo) { return DataSourceContext.getClientMap(datasourceCode).betchInsert(vo); } /** * 批量插入数据 * * @param datasourceCode * @param vo * @return */ public static Integer[] betchInsertByConnection(String datasourceCode, FormTableVo vo) { return DataSourceContext.getClientMap(datasourceCode).betchInsertByConnection(vo); } /** * 这个方法不需要分装参数,直接传字段名称list就好了 * @param datasourceCode * @param vo * @return */ public static Integer[] betchInsertForCommom(String datasourceCode, FormTableVo vo) { return DataSourceContext.getClientMap(datasourceCode).betchInsertForCommom(vo); } /** * 删除数据 * * @param datasourceCode * @param vo * @return */ public static Integer delete(String datasourceCode, FormTableVo vo) { return DataSourceContext.getClientMap(datasourceCode).delete(vo); } /** * 这个删除方法可以自定义条件服号 * @param datasourceCode * @param vo * @return */ public static Integer deleteForCommon(String datasourceCode, FormTableVo vo) { return DataSourceContext.getClientMap(datasourceCode).deleteForCommon(vo); } public static Integer deleteForFile(String datasourceCode, FormTableVo vo) { return DataSourceContext.getClientMap(datasourceCode).deleteForFile(vo); } public static String deleteForPre(String datasourceCode, FormTableVo vo) { return DataSourceContext.getClientMap(datasourceCode).deleteForPre(vo); } /** * 修改数据 * * @param datasourceCode * @param vo * @return */ public static Integer update(String datasourceCode, FormTableVo vo) { return DataSourceContext.getClientMap(datasourceCode).update(vo); } /** * 修改数据 * * @param datasourceCode * @param vo * @return */ public static Integer updateForFile(String datasourceCode, FormTableVo vo) { return DataSourceContext.getClientMap(datasourceCode).updateForFile(vo); } /** * 获取表单基本信息 * * @param vo * @return */ public static TableMetaDataVo getTableBasicInfo(String datasourceCode, FormTableVo vo) { return DataSourceContext.getClientMap(datasourceCode).getTableBasicInfo(vo); } /** * 根据字段type建表 */ public static void createCollectTable(List<CatalogColumnInfo> columnDtos, String tableName, String datasourceCode, String tableComment, Boolean ifPartition) { IClient client = DataSourceContext.getClientMap(datasourceCode); List<String> sqls = client.buildTableSqlForCollect(columnDtos, tableName, tableComment, ifPartition); log.info("执行建表语句为:" + JSON.toJSONString(sqls)); try { sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>())); } catch (Exception e) { e.printStackTrace(); String message = e.getMessage(); if (e instanceof BizException) { BizException exception = (BizException) e; message = exception.getMsg(); } log.error("建表错误=======================>{}:", message); ThreadPoolExecutor instance = ThreadPoolUtil.instance(); String finalMessage = message; instance.submit(() -> { CreateTableLog createTableLog = new CreateTableLog(); createTableLog.setErrorLog(finalMessage); createTableLog.setParams(JSON.toJSONString(sqls)); createTableLog.setCode(tableName); CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class); createTableLogService.save(createTableLog); }); throw new BizException(ResultEnum.ERROR.getCode(), "建表失败请联系管理员"); } } /** * 根据字段type建表 */ public static void updateCollectTable(CreateCollectVo vo) { IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode()); List<String> sqls = client.buildTableSqlForUpdate(vo); log.info("执行更新表语句为:" + JSON.toJSONString(sqls)); try { sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>())); } catch (Exception e) { e.printStackTrace(); String message = e.getMessage(); if (e instanceof BizException) { BizException exception = (BizException) e; message = exception.getMsg(); } log.error("建表错误=======================>{}:", message); ThreadPoolExecutor instance = ThreadPoolUtil.instance(); String finalMessage = message; instance.submit(() -> { CreateTableLog createTableLog = new CreateTableLog(); createTableLog.setErrorLog(finalMessage); createTableLog.setParams(JSON.toJSONString(sqls)); createTableLog.setCode(vo.getTableName()); CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class); createTableLogService.save(createTableLog); }); log.info("建表失败了开始准备抛出了-------------------------------------->"); throw new BizException(ResultEnum.ERROR.getCode(), "建表失败请联系管理员"); } } /** * 获取数据源下所有表信息(包括表名,表字段数,表创建时间) * * @param datasourceCode * @param tableNameLike * @return */ public static List<TableDto> getTablesDetail(String datasourceCode, String tableNameLike, Integer start, Integer pageSize, String specifyTableName) { return DataSourceContext.getClientMap(datasourceCode).getTablesDetail(tableNameLike, start, pageSize, specifyTableName); } /** * 获取表数量 * @param datasourceCode * @param tableName * @return */ public static Long getTableCountSchema(String datasourceCode, String tableName) { return DataSourceContext.getClientMap(datasourceCode).getTableCountSchema(tableName); } public static Integer getTableColumnCount(String dataSourceCode, String tableName) { return DataSourceContext.getClientMap(dataSourceCode).getTableColumnCount(tableName); } public static Integer getPreTableColumnCount(String dataSourceCode, String tableName) { return DataSourceContext.getClientMap(dataSourceCode).getPreTableColumnCount(tableName); } /** * 获取符号 * @return */ public static String getSymbol(String datasourceCode) { return DataSourceContext.getClientMap(datasourceCode).getSymbol(); } } import lombok.extern.slf4j.Slf4j; import org.reflections.Reflections; import java.lang.reflect.Modifier; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @Slf4j public class ReflectionUtils { private static final Map<String, Set<?>> clazzMap = new ConcurrentHashMap<>(); private static final ReentrantLock clazzLock = new ReentrantLock(); /** * 通过反射获取接口/抽象类的所有实现类 * 通过缓存类信息减少查找时间 * 接口与抽象类必须放在实现类的同级目录或者父目录 */ @SuppressWarnings("unchecked") public static <T> Set<Class<? extends T>> getReflections(Class<T> clazz) { if (clazzMap.containsKey(clazz.getName())) { return (Set<Class<? extends T>>) clazzMap.get(clazz.getName()); } try { clazzLock.lock(); if (clazzMap.containsKey(clazz.getName())) { return (Set<Class<? extends T>>) clazzMap.get(clazz.getName()); } Reflections reflections = new Reflections(clazz.getPackage().getName()); Set<Class<? extends T>> subTypesOf = reflections.getSubTypesOf(clazz); clazzMap.put(clazz.getName(), subTypesOf); return subTypesOf; } catch (Exception e) { log.error("getReflections error", e); } finally { clazzLock.unlock(); } return new HashSet<>(); } /** * 通过反射获取新对象 * @param type type * @param methodName methodName * @param clazz clazz * @return <T> */ public static <T> T getInstance(String type, String methodName, Class<T> clazz) { Set<Class<? extends T>> set = getReflections(clazz); for (Class<? extends T> t : set) { try { //排除抽象类 if (Modifier.isAbstract(t.getModifiers())) { continue; } Object obj = t.getMethod(methodName).invoke(t.newInstance()); if (type.equalsIgnoreCase(obj.toString())) { return t.newInstance(); } } catch (Exception e) { log.error("getInstance error", e); } } throw new RuntimeException("implement class not exist"); } /** * 通过反射获取新对象 * @param type type * @param methodName methodName * @param clazz clazz * @return <T> */ public static <T> T getInstanceFromCache(String type, String methodName, Class<T> clazz) { return getInstance(type, methodName, clazz); } }

 client客户接口端适配多种数据源

import com.ws.websocket.entity.DatasourceInfo; /** * Description: * User: wangsheng * Date: 2022-12-30 * Time: 10:31 */ public interface IClient { /** * 连接数据源 * * @param dataSourceInfo 数据源信息 * @return {@link IClient} */ IClient open(DatasourceInfo dataSourceInfo); /** * 关闭数据源 */ void close(); /** * 驱动类型 * * @return */ String driverName(); /** * 数据源类型 * * @return {@link String} */ String type(); /** * 测试数据源 * * @param datasourceInfo * @return */ boolean testSource(DatasourceInfo datasourceInfo); } import com.ws.websocket.entity.DatasourceInfo; //公共查询 public abstract class AbsClient implements IClient { protected DatasourceInfo datasourceInfo; } package com.ws.websocket.util; import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.fastjson.JSONObject; import com.ws.websocket.entity.DatasourceInfo; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.Properties; @Slf4j public abstract class AbsRdbmsClient extends AbsClient { protected DruidDataSource druidDataSource; @Override public IClient open(DatasourceInfo datasourceInfo) { RdbmsConfig rdbmsConfig = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class); DruidDataSource druidDataSource = new DruidDataSource(); druidDataSource.setInitialSize(5); druidDataSource.setMinIdle(30); druidDataSource.setMaxActive(300); druidDataSource.setMaxWait(10000); druidDataSource.setBreakAfterAcquireFailure(true);// 跳出重试循环 druidDataSource.setConnectionErrorRetryAttempts(3);// 重试三次 druidDataSource.setTimeBetweenConnectErrorMillis(3000); druidDataSource.setLoginTimeout(3); druidDataSource.setUrl(rdbmsConfig.getJdbcUrl()); druidDataSource.setDriverClassName(driverName()); druidDataSource.setUsername(rdbmsConfig.getUsername()); //解密 // druidDataSource.setPassword(RsaUtils.decode(rdbmsConfig.getPassword())); druidDataSource.setPassword(rdbmsConfig.getPassword()); // 设置 MetaUtil 工具类所需参数 Properties properties = new Properties(); properties.put("remarks", "true"); properties.put("useInformationSchema", "true"); druidDataSource.setConnectProperties(properties); this.druidDataSource = druidDataSource; this.datasourceInfo = datasourceInfo; return this; } @Override public void close() { druidDataSource.close(); } @Override public boolean testSource(DatasourceInfo datasourceInfo) { Connection connection = null; try { Class.forName(driverName()); String linkJson = datasourceInfo.getLinkJson(); RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class); connection = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getPassword()); // 有效 if (connection.isValid(3)) { return true; } else { return false; } } catch (SQLException e) { log.error("数据源测试失败", e); return false; } catch (ClassNotFoundException e) { log.error("未找到驱动信息:{}", driverName()); return false; } finally { if (connection != null) { try { connection.close(); } catch (SQLException ex) { ex.printStackTrace(); } } } } @Data class RdbmsConfig { private String jdbcUrl; private String username; private String password; public void setSSL() { String lowerCase = this.jdbcUrl.toLowerCase(); if (!lowerCase.contains("usessl")) { if (this.jdbcUrl.contains("?")) { this.jdbcUrl = this.jdbcUrl + "&useSSL=false"; } else { this.jdbcUrl = this.jdbcUrl + "?useSSL=false"; } } } } } import com.alibaba.fastjson.JSONObject; import com.ws.websocket.entity.DatasourceInfo; import lombok.extern.slf4j.Slf4j; import org.apache mons.lang3.StringUtils; @Slf4j public class DmClient extends AbsRdbmsClient { private String schema; @Override public String type() { return "DMDB"; } @Override public String driverName() { return "dm.jdbc.driver.DmDriver"; } @Override public IClient open(DatasourceInfo datasourceInfo) { RdbmsConfig commonLinkParams = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class); this.schema = StringUtils.isNotBlank(datasourceInfo.getSchemaName()) ? datasourceInfo.getSchemaName() : commonLinkParams.getUsername().toUpperCase(); datasourceInfo.setSchemaName(schema); return super.open(datasourceInfo); } @Override public void close() { } @Override public boolean testSource(DatasourceInfo datasourceInfo) { return false; } }

4.创建redis订阅数据源操作频道配置

import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; /** * @Author: wangsheng * @Data: 2022/8/16 16:40 */ @Slf4j @Configuration public class RedisListenerConfig { /** * 订阅数据源操作频道 * * @param connectionFactory connectionFactory * @param dataSourceMonitor 数据源监视器 * @return RedisMessageListenerContainer */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, DataSourceMonitor dataSourceMonitor){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(dataSourceMonitor, new PatternTopic("DATASOURCE_CHANNEL")); log.info(dataSourceMonitor.getClass().getName() + " 订阅频道 {}", "DATASOURCE_CHANNEL"); return container; } }

5.redis监听数据源操作

import com.alibaba.fastjson.JSONObject; import com.ws.websocket.entity.DatasourceInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; /** * Description: redis监听数据源操作 * User: wangsheng * Date: 2022-08-12 * Time: 17:07 */ @Slf4j @Component public class DataSourceMonitor implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { JSONObject box = JSONObject.parseObject(new String(message.getBody(), StandardCharsets.UTF_8)); String operation = box.getString("key"); if ("SAVE_OR_UPDATE".equals(operation)) { // 更新 DataSourceContext DatasourceInfo datasourceInfo = box.getObject("value", DatasourceInfo.class); if (datasourceInfo.getType().equals(0)) { String datasourceCode = datasourceInfo.getDatasourceCode(); DataSourceContext.setConfigMap(datasourceCode, datasourceInfo); DataSourceContext.setClientMap(datasourceInfo); log.info("redis 监听到数据源 {} 新增或更新,更新 DataSourceContext 完成", datasourceCode); } } else { String datasourceCode = box.getString("value"); // 更新 DataSourceContext DataSourceContext.removeConfigMap(datasourceCode); DataSourceContext.removeClientMap(datasourceCode); log.info("redis 监听到数据源 {} 删除,更新 DataSourceContext 完成", datasourceCode); } } }

6.创建AOP自动监听数据源变化

import com.alibaba.fastjson.JSONObject; import com.ws.websocket.entity.DatasourceInfo; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.Aspect; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; /** * @Author: wangsheng * @Data: 2022/8/15 16:37 */ @Slf4j @Aspect @Component public class DatasourceAspect { @Resource private StringRedisTemplate stringRedisTemplate; /** * 新增或编辑数据源时发布 Redis 消息 */ @AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.saveOrUpdateDatasourceInfo(..))", returning = "datasourceInfo") public void saveOrUpdate(JoinPoint joinPoint, DatasourceInfo datasourceInfo) { HashMap<String, Object> box = new HashMap<>(4); box.put("key", "SAVE_OR_UPDATE"); box.put("value", datasourceInfo); // 发布 Redis 消息 stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL",JSONObject.toJSONString(box)); log.info("新增或更新数据源 {} 方法切面发布 Redis 消息完成", datasourceInfo.getDatasourceCode()); } /** * 删除数据源时发布 Redis 消息 */ @AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.deleteDatasourceInfo(..))", returning = "datasourceCode") public void delete(JoinPoint joinPoint, String datasourceCode) { Map<String, Object> box = new HashMap<>(4); box.put("key", "DELETE"); box.put("value", datasourceCode); // 发布 Redis 消息 stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box)); log.info("删除数据源 {} 方法切面发布Redis消息完成", datasourceCode); } }

这样就解决了数据源连接信息自动加载更新同步的问题,但还是有个问题,当数据源重启后,缓存的连接信息会失效,且AOP无法监听到数据源重启变动,这个时候还需要一个定时任务对数据源进行连接测试,如果失效则重新连接缓存上。

7.创建定时任务

import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.ws.websocket.entity.DatasourceInfo; import com.ws.websocket.service.DatasourceInfoService; import com.ws.websocket.util.DBTool; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; @Component @RequiredArgsConstructor @Slf4j public class DataSourceRetryConnectSchedule { @Resource private DatasourceInfoService datasourceInfoService; @Resource private StringRedisTemplate stringRedisTemplate; //每2小时执行一次 @Scheduled(cron = "0 0 */2 * * ?") public void RetryConnect() { log.info("开始监测数据源连接"); QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("type", 0); List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper); if (CollectionUtils.isEmpty(list)) { return; } for (DatasourceInfo datasourceInfo : list) { Boolean bb = DBTool.testSource(datasourceInfo); if (!bb) { log.info("数据源重连{}"+datasourceInfo.getDatasourceName()); HashMap<String, Object> box = new HashMap<>(4); box.put("key", "SAVE_OR_UPDATE"); box.put("value", datasourceInfo); // 发布 Redis 消息 stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box)); } } } }

标签:

Springboot使用Redis发布订阅自动更新缓存数据源由讯客互联电脑硬件栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Springboot使用Redis发布订阅自动更新缓存数据源