diff --git a/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/base/IDataSeparatorService.java b/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/base/IDataSeparatorService.java new file mode 100644 index 0000000..51b7950 --- /dev/null +++ b/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/base/IDataSeparatorService.java @@ -0,0 +1,58 @@ +package cn.estsh.i3plus.core.api.iservice.base; + +import cn.estsh.i3plus.pojo.base.bean.ListPager; +import cn.estsh.i3plus.pojo.base.common.Pager; +import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository; +import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage; +import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule; + +/** + * @Description :通用数据分离 + * @Reference : + * @Author : Castle + * @CreateDate : 2021/12/3 13:41 + * @Modify: + **/ +public interface IDataSeparatorService { + + /** + * 进行数据分离 + * @param baseRepository + * @param msg + */ + void doSeparate(BaseRepository baseRepository, DataSeparatorMessage msg); + + /** + * 添加分离规则 + * @param msg + * @return + */ + boolean addRule (DataSeparatorRule msg); + + /** + * 需要删除redis中缓存 + * @param id + * @return + */ + boolean deleteRule(Long id); + + /** + * 更新数据库及redis + * @param msg + * @return + */ + boolean update(DataSeparatorRule msg); + + /** + * 根据 + * @param dataSeparatorRule + * @param pager + * @return + */ + ListPager query(DataSeparatorRule dataSeparatorRule, Pager pager); + + /** + * 初始化数据到redis中 + */ + void separatorRedisInit(); +} diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/configuration/DataSeparatorInit.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/configuration/DataSeparatorInit.java new file mode 100644 index 0000000..076f2db --- /dev/null +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/configuration/DataSeparatorInit.java @@ -0,0 +1,34 @@ +package cn.estsh.i3plus.core.apiservice.configuration; + +import cn.estsh.i3plus.core.api.iservice.base.IDataSeparatorService; +import cn.estsh.impp.framework.boot.init.IAppStartSystemInit; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @Description : + * @Reference : + * @Author : Castle + * @CreateDate : 2021/12/7 15:35 + * @Modify: + **/ + +@Slf4j +@Component +public class DataSeparatorInit implements IAppStartSystemInit { + + @Autowired + private IDataSeparatorService dataSeparatorService; + + @Override + public void systemInit() { + try { + log.info("【通用数据冷热分离初始化开始...】"); + dataSeparatorService.separatorRedisInit(); + log.info("【通用数据冷热分离初始化结束...】"); + } catch (Exception e) { + log.error("【通用数据冷热分离初始化失败】,失败原因:{}", e.getMessage()); + } + } +} diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/base/DataSeparatorController.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/base/DataSeparatorController.java new file mode 100644 index 0000000..be37eb1 --- /dev/null +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/base/DataSeparatorController.java @@ -0,0 +1,83 @@ +package cn.estsh.i3plus.core.apiservice.controller.base; + +import cn.estsh.i3plus.core.api.iservice.base.IDataSeparatorService; +import cn.estsh.i3plus.platform.common.util.PlatformConstWords; +import cn.estsh.i3plus.pojo.base.bean.ListPager; +import cn.estsh.i3plus.pojo.base.common.Pager; +import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule; +import cn.estsh.impp.framework.boot.exception.ImppBusiException; +import cn.estsh.impp.framework.boot.exception.ImppExceptionBuilder; +import cn.estsh.impp.framework.boot.util.ResultBean; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +/** + * @Description : + * @Reference : + * @Author : Castle + * @CreateDate : 2021/12/3 16:38 + * @Modify: + **/ +@Api(tags = "通用冷热数据分离管理") +@RestController +@RequestMapping(PlatformConstWords.BASE_URL + "/separator") +public class DataSeparatorController { + + @Autowired + private IDataSeparatorService dataSeparatorService; + + @GetMapping("/query") + @ApiOperation(notes = "根据条件查询数据分离规则", value = "根据条件查询数据分离规则") + public ResultBean query(DataSeparatorRule dataSeparatorRule, Pager pager) { + try { + ListPager result = dataSeparatorService.query(dataSeparatorRule, pager); + return ResultBean.success("false").setListPager(result).setMsg("查询成功"); + } catch (ImppBusiException busExcep) { + return ResultBean.fail(busExcep).build(); + } catch (Exception e) { + return ImppExceptionBuilder.newInstance().buildExceptionResult(e); + } + } + + @PostMapping("/add") + @ApiOperation(notes = "添加数据分离规则", value = "添加数据分离规则") + public ResultBean add(@RequestBody DataSeparatorRule dataSeparatorRule) { + try { + dataSeparatorService.addRule(dataSeparatorRule); + return ResultBean.success("添加成功"); + } catch (ImppBusiException busExcep) { + return ResultBean.fail(busExcep).build(); + } catch (Exception e) { + return ImppExceptionBuilder.newInstance().buildExceptionResult(e); + } + + } + + @DeleteMapping("/delete/{id}") + @ApiOperation(notes = "逻辑删除数据分离规则", value = "逻辑删除数据分离规则") + public ResultBean delete(@PathVariable Long id) { + try { + dataSeparatorService.deleteRule(id); + return ResultBean.success("删除成功"); + } catch (ImppBusiException busExcep) { + return ResultBean.fail(busExcep).build(); + } catch (Exception e) { + return ImppExceptionBuilder.newInstance().buildExceptionResult(e); + } + } + + @PutMapping("/update") + @ApiOperation(notes = "更新数据分离规则", value = "更新数据分离规则") + public ResultBean update(@RequestBody DataSeparatorRule dataSeparatorRule) { + try { + dataSeparatorService.update(dataSeparatorRule); + return ResultBean.success("修改成功"); + } catch (ImppBusiException busExcep) { + return ResultBean.fail(busExcep).build(); + } catch (Exception e) { + return ImppExceptionBuilder.newInstance().buildExceptionResult(e); + } + } +} diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/DataSeparatorQueueReceiver.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/DataSeparatorQueueReceiver.java index 0304696..93c38a3 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/DataSeparatorQueueReceiver.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/DataSeparatorQueueReceiver.java @@ -1,20 +1,34 @@ package cn.estsh.i3plus.core.apiservice.mq; -import cn.estsh.i3plus.platform.common.util.CommonConstWords; -import cn.estsh.impp.framework.boot.util.ImppRedis; +import cn.estsh.i3plus.core.apiservice.serviceimpl.base.DataSeparatorServiceImpl; +import cn.estsh.i3plus.platform.common.tool.JsonUtilTool; +import cn.estsh.i3plus.platform.common.util.PlatformConstWords; +import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository; +import cn.estsh.i3plus.pojo.base.jpa.daoimpl.BaseRepositoryImpl; +import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage; +import cn.estsh.impp.framework.boot.util.SpringContextsUtil; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.aop.framework.AdvisedSupport; +import org.springframework.aop.framework.AopProxy; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; - -import static cn.estsh.i3plus.platform.common.util.PlatformConstWords.QUEUE_IMPP_DATA_SEPARATOR; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * @Description : @@ -24,13 +38,15 @@ import static cn.estsh.i3plus.platform.common.util.PlatformConstWords.QUEUE_IMPP * @Modify: **/ @Configuration -@ConditionalOnExpression(" '${impp.mq.queue.data.separator:false}' == 'true' ") +@ConditionalOnExpression(" '${impp.aspect.data.separation:false}' == 'true' ") public class DataSeparatorQueueReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(DataSeparatorQueueReceiver.class); - @Resource(name = CommonConstWords.IMPP_REDIS_RES) - private ImppRedis redisRes; + private static final Map REPOSITORY_HASH_MAP = new HashMap<>(); + + @Resource + private DataSeparatorServiceImpl dataSeparatorService; /** * QUEUE_SWEB_NOTICE 队列 @@ -39,14 +55,73 @@ public class DataSeparatorQueueReceiver { * @throws Exception */ @Bean - public Queue getQueueSwebNoticeQueue() { - return new Queue(QUEUE_IMPP_DATA_SEPARATOR); + public Queue getDataSeparatorQueue() { + return new Queue(PlatformConstWords.QUEUE_IMPP_DATA_SEPARATOR); } - @RabbitListener(queues = QUEUE_IMPP_DATA_SEPARATOR) - public Boolean consumer(Channel channel, Message message){ - byte[] msg = message.getBody(); + @RabbitListener(queues = PlatformConstWords.QUEUE_IMPP_DATA_SEPARATOR,concurrency = "5") + public Boolean consumer(String data, Channel channel, Message message) { + DataSeparatorMessage msg = JsonUtilTool.decode(data, DataSeparatorMessage.class); + String refClass = msg.getRefClass(); + BaseRepository repository; + //找到项目中bean的repository + if (REPOSITORY_HASH_MAP.containsKey(refClass)) { + repository = REPOSITORY_HASH_MAP.get(refClass); + } else { + + Map beansRepositories = SpringContextsUtil.getBeansOfType(BaseRepository.class); + List collect = beansRepositories.values().stream().filter(item -> { + String clazz = null; + try { + //第一层代理 + Field h = item.getClass().getSuperclass().getDeclaredField("h"); + h.setAccessible(true); + AopProxy o = (AopProxy) h.get(item); + Field advised = o.getClass().getDeclaredField("advised"); + advised.setAccessible(true); + Object target = ((AdvisedSupport) advised.get(o)).getTargetSource().getTarget(); + + //第二层代理 + Field h1 = target.getClass().getSuperclass().getDeclaredField("h"); + h1.setAccessible(true); + AopProxy aopProxy = (AopProxy) h1.get(target); + Field advised1 = aopProxy.getClass().getDeclaredField("advised"); + advised1.setAccessible(true); + Object target1 = ((AdvisedSupport) advised1.get(aopProxy)).getTargetSource().getTarget(); + + Field persistentClass = target1.getClass().getDeclaredField("persistentClass"); + persistentClass.setAccessible(true); + clazz = persistentClass.get(target1).toString(); + } catch (NoSuchFieldException | IllegalAccessException e) { + return false; + } catch (Exception e) { + LOGGER.error("通用数据分离,获取repository异常 :{}", e.getMessage()); + } + + return clazz.substring(clazz.lastIndexOf(".") + 1).equals(refClass.substring(refClass.lastIndexOf(".") + 1)); + + }).collect(Collectors.toList()); + + if (!collect.isEmpty()) { + repository = collect.get(0); + } else { + LOGGER.error("{}:找不到对应的RepositoryBean!", refClass); + return false; + } + } + + try { + dataSeparatorService.doSeparate(repository, msg); + } catch (Exception e) { + LOGGER.error("通用冷热数据分离失败,失败原因:{}", e.getMessage()); + } finally { + try { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } catch (IOException e) { + LOGGER.error("通用冷热数据分离失败,返回ACK失败:{}", e.getMessage()); + } + } - return false; + return true; } -} +} \ No newline at end of file diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/DataSeparatorServiceImpl.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/DataSeparatorServiceImpl.java new file mode 100644 index 0000000..74e986c --- /dev/null +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/DataSeparatorServiceImpl.java @@ -0,0 +1,117 @@ +package cn.estsh.i3plus.core.apiservice.serviceimpl.base; + +import cn.estsh.i3plus.core.api.iservice.base.IDataSeparatorService; +import cn.estsh.i3plus.core.apiservice.util.strategy.IDataSeparatorStrategy; +import cn.estsh.i3plus.platform.common.convert.ConvertBean; +import cn.estsh.i3plus.platform.common.util.CommonConstWords; +import cn.estsh.i3plus.pojo.base.bean.DdlPackBean; +import cn.estsh.i3plus.pojo.base.bean.ListPager; +import cn.estsh.i3plus.pojo.base.common.Pager; +import cn.estsh.i3plus.pojo.base.common.PagerHelper; +import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil; +import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository; +import cn.estsh.i3plus.pojo.base.tool.DdlPreparedPack; +import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage; +import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule; +import cn.estsh.i3plus.pojo.platform.repository.DataSeparatorRepository; +import cn.estsh.i3plus.pojo.platform.sqlpack.CoreHqlPack; +import cn.estsh.impp.framework.boot.auth.AuthUtil; +import cn.estsh.impp.framework.boot.util.ImppRedis; +import cn.estsh.impp.framework.boot.util.SpringContextsUtil; +import com.google.common.base.Strings; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.List; + +/** + * @Description : + * @Reference : + * @Author : Castle + * @CreateDate : 2021/12/3 13:44 + * @Modify: + **/ +@Service +public class DataSeparatorServiceImpl implements IDataSeparatorService { + + @Resource(name = CommonConstWords.IMPP_REDIS_RES) + private ImppRedis redisRes; + + @Resource + private DataSeparatorRepository dataSeparatorRepository; + + @Override + public void doSeparate(BaseRepository baseRepository, DataSeparatorMessage msg) { + //获取数据 + Long id = msg.getId(); + Object bean = baseRepository.getById(id); + //获取策略 + String strategy = ImppEnumUtil.DATA_SEPARATOR_STRATEGY.codeOfStrategyName(msg.getClassification()); + IDataSeparatorStrategy separator = (IDataSeparatorStrategy) SpringContextsUtil.getBean(strategy); + //迁移数据 + separator.execute(bean, msg); + //删除原数据 + baseRepository.deleteById(id); + } + + @Override + public boolean addRule(DataSeparatorRule msg) { + ConvertBean.saveOrUpdate(msg, AuthUtil.getSessionUser().getUserName()); + DataSeparatorRule rule = dataSeparatorRepository.insert(msg); + String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + rule.getRefBeanName().toUpperCase(); + redisRes.putObject(redisKey, rule, -1); + return true; + } + + @Override + public boolean deleteRule(Long id) { + DataSeparatorRule rule = dataSeparatorRepository.getById(id); + String redisKey = CommonConstWords.RES_DATA_SEPARATOR + rule.getRefBeanName().toUpperCase(); + dataSeparatorRepository.deleteWeaklyById(id, AuthUtil.getSessionUser().getUserName()); + redisRes.deleteKey(redisKey); + return true; + } + + @Override + public boolean update(DataSeparatorRule msg) { + ConvertBean.saveOrUpdate(msg, AuthUtil.getSessionUser().getUserName()); + dataSeparatorRepository.update(msg); + String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + msg.getRefBeanName().toUpperCase(); + redisRes.putObject(redisKey, msg, -1); + return true; + } + + @Override + public ListPager query(DataSeparatorRule rule, Pager pager) { + DdlPackBean ddlPackBean = DdlPackBean.getDdlPackBean(rule.getOrganizeCode()); + if (Strings.isNullOrEmpty(rule.getClassification())) { + DdlPreparedPack.getStringEqualPack(rule.getClassification(), "classification", ddlPackBean); + } + + if (Strings.isNullOrEmpty(rule.getRefBeanName())) { + DdlPreparedPack.getStringLeftLikerPack(rule.getRefBeanName(), "refBeanName", ddlPackBean); + } + + if (Strings.isNullOrEmpty(rule.getDatabaseName())) { + DdlPreparedPack.getStringLeftLikerPack(rule.getDatabaseName(), "databaseName", ddlPackBean); + } + + if (Strings.isNullOrEmpty(rule.getRuleColumn())) { + DdlPreparedPack.getStringLeftLikerPack(rule.getRuleColumn(), "ruleColumn", ddlPackBean); + } + pager = PagerHelper.getPager(pager, dataSeparatorRepository.findByHqlWhereCount(ddlPackBean)); + + return new ListPager(dataSeparatorRepository.findByHqlPage(ddlPackBean, pager), pager); + } + + @Override + public void separatorRedisInit() { + //启动加载所有的数据到redis中 + List list = dataSeparatorRepository.list(); + list.forEach(item -> { + String refBeanName = item.getRefBeanName().toUpperCase(); + String redisKey = CommonConstWords.RES_DATA_SEPARATOR + refBeanName; + redisRes.putObject(redisKey, item, -1); + }); + } +} diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/DataSeparatorStrategyMysql.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/DataSeparatorStrategyMysql.java new file mode 100644 index 0000000..79bdb59 --- /dev/null +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/DataSeparatorStrategyMysql.java @@ -0,0 +1,103 @@ +package cn.estsh.i3plus.core.apiservice.serviceimpl.base; + +import cn.estsh.i3plus.core.apiservice.util.strategy.IDataSeparatorStrategy; +import cn.estsh.i3plus.platform.plugin.datasource.DynamicDataSourceProxy; +import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.persistence.Column; +import java.lang.reflect.Field; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @Description : + * @Reference : + * @Author : Castle + * @CreateDate : 2021/12/3 14:02 + * @Modify: + **/ +@Component +@Slf4j +public class DataSeparatorStrategyMysql implements IDataSeparatorStrategy { + + /** + * 根据目标数据数据的url+数据库名作为key 缓存动态数据源 + */ + public static final Map DYNAMIC_DATA_SOURCE_PROXY_MAP = new HashMap<>(); + + + @Override + public void execute(Object bean, DataSeparatorMessage msg) { + //获取动态数据源 + DynamicDataSourceProxy dynamicDataSource = getDynamicDataSource(msg); + + //封装Map + HashMap objMap = getObjMap(bean); + + //执行sql + String insertSQL = dynamicDataSource.packInsertSQL(msg.getDestTableName(), objMap); + try { + dynamicDataSource.execute(insertSQL); + } catch (SQLException e) { + log.error("执行sql:{}失败,失败原因:{}", insertSQL, e.getMessage()); + } + + + } + + /** + * 获取动态数据源 + * + * @param msg + * @return + */ + private DynamicDataSourceProxy getDynamicDataSource(DataSeparatorMessage msg) { + DynamicDataSourceProxy dataSourceProxy; + String destUrl = msg.getDestUrl(); + String dataBaseName = msg.getDatabaseName(); + String mapKey = destUrl + ":" + dataBaseName; + if (!DYNAMIC_DATA_SOURCE_PROXY_MAP.containsKey(mapKey)) { + String url = "jdbc:mysql://" + destUrl + "/" + dataBaseName + "?autoReconnect=true&useSSL=false&characterEncoding=utf-8"; + dataSourceProxy = DynamicDataSourceProxy.initDataSourceFactory("com.mysql.jdbc.Driver",url,msg.getUserName(),msg.getPassword()); + DYNAMIC_DATA_SOURCE_PROXY_MAP.put(mapKey, dataSourceProxy); + } + dataSourceProxy = DYNAMIC_DATA_SOURCE_PROXY_MAP.get(mapKey); + return dataSourceProxy; + } + + /** + * 获取封装的k-v值 + * + * @param bean + * @return + */ + private HashMap getObjMap(Object bean) { + Field[] declaredFields = bean.getClass().getDeclaredFields(); + Field[] parentFields = bean.getClass().getSuperclass().getDeclaredFields(); + HashMap objMap = new HashMap<>(declaredFields.length); + List list = new ArrayList<>(); + list.add(declaredFields); + list.add(parentFields); + list.forEach(item -> { + for (Field field : item) { + field.setAccessible(true); + if (null != field.getAnnotation(Column.class)) { + Column annotation = field.getAnnotation(Column.class); + String columnName = annotation.name(); + try { + Object value = field.get(bean); + objMap.put(columnName, value); + } catch (IllegalAccessException e) { + log.error("冷热分离Mysql封装数据错误:{}!", columnName); + } + } + } + }); + return objMap; + } +} diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/util/strategy/IDataSeparatorStrategy.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/util/strategy/IDataSeparatorStrategy.java new file mode 100644 index 0000000..92d4e82 --- /dev/null +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/util/strategy/IDataSeparatorStrategy.java @@ -0,0 +1,20 @@ +package cn.estsh.i3plus.core.apiservice.util.strategy; + +import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage; + +/** + * @Description : + * @Reference : + * @Author : Castle + * @CreateDate : 2021/12/3 13:59 + * @Modify: + **/ +public interface IDataSeparatorStrategy { + + /** + * 所有策略方法实现本方法,进行数据分离 + * @param bean + * @param msg + */ + void execute(Object bean, DataSeparatorMessage msg); +}