From 417679d1af854567173f1b35b0eb36c3be0be228 Mon Sep 17 00:00:00 2001 From: "castle.zang" Date: Thu, 13 Jan 2022 09:58:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=86=B7=E7=83=AD=E6=95=B0=E6=8D=AE=E5=88=86?= =?UTF-8?q?=E7=A6=BB--=E9=87=8D=E6=9E=84--=E6=B5=8B=E8=AF=95=E5=AE=8C?= =?UTF-8?q?=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/iservice/base/ICoreDataSourceService.java | 6 +- .../api/iservice/base/IDataSeparatorService.java | 4 +- .../apiservice/configuration/CoreJdbcTemplate.java | 4 +- .../configuration/CoreJdbcTemplateConfig.java | 23 ++-- .../controller/base/DataSourceController.java | 2 +- .../core/apiservice/dao/ICoreDataSourceDao.java | 2 + .../core/apiservice/daoimpl/CoreDataSourceDao.java | 73 +++++++------ .../apiservice/mq/DataSeparatorQueueReceiver.java | 121 +++++++++++---------- .../base/CoreDataSourceServiceImpl.java | 16 ++- .../serviceimpl/base/DataSeparatorServiceImpl.java | 93 +++++++++++----- .../serviceimpl/base/SystemInitService.java | 16 +++ .../core/apiservice/util/SeparatorDataUtil.java | 3 +- pom.xml | 20 ++-- 13 files changed, 225 insertions(+), 158 deletions(-) diff --git a/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/base/ICoreDataSourceService.java b/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/base/ICoreDataSourceService.java index fc51dc8..039503f 100644 --- a/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/base/ICoreDataSourceService.java +++ b/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/base/ICoreDataSourceService.java @@ -50,7 +50,7 @@ public interface ICoreDataSourceService { * @param source */ @ApiOperation(value = "数据源唯一检查") - void checkBfDataSourceOnly(CoreDataSource source); + void checkDataSourceOnly(CoreDataSource source); /** @@ -82,4 +82,8 @@ public interface ICoreDataSourceService { */ List list(); + /** + * 加载数据源 + */ + void initCoreDataSource(); } diff --git a/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/base/IDataSeparatorService.java b/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/base/IDataSeparatorService.java index 51b7950..6653443 100644 --- a/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/base/IDataSeparatorService.java +++ b/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/base/IDataSeparatorService.java @@ -2,7 +2,6 @@ package cn.estsh.i3plus.core.api.iservice.base; import cn.estsh.i3plus.pojo.base.bean.ListPager; import cn.estsh.i3plus.pojo.base.common.Pager; -import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository; import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage; import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule; @@ -17,10 +16,9 @@ public interface IDataSeparatorService { /** * 进行数据分离 - * @param baseRepository * @param msg */ - void doSeparate(BaseRepository baseRepository, DataSeparatorMessage msg); + void doSeparate(DataSeparatorMessage msg); /** * 添加分离规则 diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/configuration/CoreJdbcTemplate.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/configuration/CoreJdbcTemplate.java index 1d46c33..54b41bd 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/configuration/CoreJdbcTemplate.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/configuration/CoreJdbcTemplate.java @@ -1,6 +1,6 @@ package cn.estsh.i3plus.core.apiservice.configuration; -import cn.estsh.i3plus.pojo.wms.bean.datasource.CusDatasource; +import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource; import cn.estsh.impp.framework.boot.exception.ImppExceptionBuilder; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -18,7 +18,7 @@ import java.sql.*; @Slf4j @Data public class CoreJdbcTemplate { - private CusDatasource source; + private CoreDataSource source; private NamedParameterJdbcTemplate coreJdbcTemplate; diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/configuration/CoreJdbcTemplateConfig.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/configuration/CoreJdbcTemplateConfig.java index 956de45..bf25221 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/configuration/CoreJdbcTemplateConfig.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/configuration/CoreJdbcTemplateConfig.java @@ -5,7 +5,6 @@ import cn.estsh.i3plus.platform.common.exception.ImppExceptionEnum; import cn.estsh.i3plus.platform.common.util.CommonConstWords; import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil; import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource; -import cn.estsh.i3plus.pojo.wms.bean.datasource.CusDatasource; import cn.estsh.impp.framework.boot.exception.ImppExceptionBuilder; import cn.estsh.impp.framework.boot.util.ImppRedis; import com.zaxxer.hikari.HikariDataSource; @@ -65,7 +64,7 @@ public class CoreJdbcTemplateConfig { private AnnotationConfigServletWebServerApplicationContext context; - @Resource(name = CommonConstWords.IMPP_REDIS_WMS) + @Resource(name = CommonConstWords.IMPP_REDIS_RES) private ImppRedis redis; @Autowired @@ -79,15 +78,15 @@ public class CoreJdbcTemplateConfig { removeJdbcTemplateBean(coreDatasource); } } catch (Exception e) { - log.error("Error occurred when removing the older cusDataSource", e); + log.error("Error occurred when removing the older coreDataSource", e); } - log.info("Spring Ioc Init CusDatasource to Ioc:{}", coreDatasource); + log.info("Spring Ioc Init coreDataSource to Ioc:{}", coreDatasource); NamedParameterJdbcTemplate namedParameterJdbcTemplate = getNamedParameterJdbcTemplate(coreDatasource); BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(CoreJdbcTemplate.class); beanDefinitionBuilder.addPropertyValue("source", coreDatasource); - beanDefinitionBuilder.addPropertyValue("wmsJdbcTemplate", namedParameterJdbcTemplate); + beanDefinitionBuilder.addPropertyValue("coreJdbcTemplate", namedParameterJdbcTemplate); //存入redis redis.putObject(CORE_CACHE_POOL_PREFIX + coreDatasource.getId(), coreDatasource); @@ -100,7 +99,7 @@ public class CoreJdbcTemplateConfig { } /** - * 获取wms的数据源链接,根据与redis中的缓存做对比,是否修改了数据源 + * 获取core的数据源链接,根据与redis中的缓存做对比,是否修改了数据源 * * @param key * @return @@ -156,16 +155,16 @@ public class CoreJdbcTemplateConfig { * @param cusDataSource * @return */ - private NamedParameterJdbcTemplate getNamedParameterJdbcTemplate(CoreDataSource cusDataSource) { - log.info("【数据源】创建数据源cusDatasource:{}", cusDataSource); - CommonEnumUtil.DATA_SOURCE_TYPE type = CommonEnumUtil.DATA_SOURCE_TYPE.valueOf(cusDataSource.getSourceType()); + private NamedParameterJdbcTemplate getNamedParameterJdbcTemplate(CoreDataSource coreDataSource) { + log.info("【数据源】创建数据源coreDataSource:{}", coreDataSource); + CommonEnumUtil.DATA_SOURCE_TYPE type = CommonEnumUtil.DATA_SOURCE_TYPE.valueOf(coreDataSource.getSourceType()); DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create(); dataSourceBuilder.type(HikariDataSource.class); dataSourceBuilder.driverClassName(type.getDriverClassName()); - dataSourceBuilder.url(type.getJDBCUrl(cusDataSource.getSourceDataBaseName(), cusDataSource.getSourceHost(), cusDataSource.getSourcePort())); - dataSourceBuilder.username(cusDataSource.getSourceUserName()); - dataSourceBuilder.password(cusDataSource.getSourcePassword()); + dataSourceBuilder.url(type.getJDBCUrl(coreDataSource.getSourceDataBaseName(), coreDataSource.getSourceHost(), coreDataSource.getSourcePort())); + dataSourceBuilder.username(coreDataSource.getSourceUserName()); + dataSourceBuilder.password(coreDataSource.getSourcePassword()); HikariDataSource dataSource = (HikariDataSource) dataSourceBuilder.build(); //连接池大小 diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/base/DataSourceController.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/base/DataSourceController.java index bc3e178..bb9fe3d 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/base/DataSourceController.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/base/DataSourceController.java @@ -61,7 +61,7 @@ public class DataSourceController { .notNull("sourcePassword", coreDatasource.getSourcePassword()) .notNull("sourceDataBaseName", coreDatasource.getSourceDataBaseName()); ConvertBean.modelInitialize(coreDatasource, AuthUtil.getSessionUser()); - datasourceService.checkBfDataSourceOnly(coreDatasource); + datasourceService.checkDataSourceOnly(coreDatasource); coreDatasource = datasourceService.saveDataSource(coreDatasource); return ResultBean.success("操作成功").setCode(ResourceEnumUtil.MESSAGE.SUCCESS.getCode()).setResultObject(coreDatasource); } catch (ImppBusiException exception) { diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/dao/ICoreDataSourceDao.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/dao/ICoreDataSourceDao.java index fffb579..b3b01aa 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/dao/ICoreDataSourceDao.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/dao/ICoreDataSourceDao.java @@ -1,6 +1,7 @@ package cn.estsh.i3plus.core.apiservice.dao; import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource; +import org.springframework.stereotype.Repository; import java.util.List; @@ -11,6 +12,7 @@ import java.util.List; * @CreateDate : 2022/1/7 13:36 * @Modify: **/ +@Repository public interface ICoreDataSourceDao { /** diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/daoimpl/CoreDataSourceDao.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/daoimpl/CoreDataSourceDao.java index 8cd5781..6a7be99 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/daoimpl/CoreDataSourceDao.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/daoimpl/CoreDataSourceDao.java @@ -1,36 +1,37 @@ -package cn.estsh.i3plus.core.apiservice.daoimpl; - -import cn.estsh.i3plus.core.apiservice.dao.ICoreDataSourceDao; -import cn.estsh.i3plus.core.apiservice.util.PlatformEnumUtil; -import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil; -import cn.estsh.i3plus.pojo.base.enumutil.WmsEnumUtil; -import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.persistence.EntityManager; -import javax.persistence.Query; -import java.util.List; - -/** - * @Description : - * @Reference : - * @Author : Castle - * @CreateDate : 2022/1/7 13:38 - * @Modify: - **/ -@Component -public class CoreDataSourceDao implements ICoreDataSourceDao { - @Autowired - private EntityManager entityManager; - - @Override - public List listAllIdAndSourceName() { - String hql = "select new CusDatasource(id,sourceName) from CoreDatasource where isDeleted = :is_deleted and isValid = :is_valid and sourceStatus = :sourceStatus"; - Query query = entityManager.createQuery(hql); - query.setParameter("is_deleted", CommonEnumUtil.TRUE_OR_FALSE.FALSE.getValue()); - query.setParameter("is_valid", CommonEnumUtil.TRUE_OR_FALSE.FALSE.getValue()); - query.setParameter("sourceStatus", PlatformEnumUtil.DATA_SOURCE_STATUS.CONN_SUCCESS.getValue()); - return query.getResultList(); - } -} +//package cn.estsh.i3plus.core.apiservice.daoimpl; +// +//import cn.estsh.i3plus.core.apiservice.dao.ICoreDataSourceDao; +//import cn.estsh.i3plus.core.apiservice.util.PlatformEnumUtil; +//import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil; +//import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource; +//import cn.estsh.impp.framework.boot.auth.AuthUtil; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.stereotype.Service; +// +//import javax.persistence.EntityManager; +//import javax.persistence.Query; +//import java.util.List; +// +///** +// * @Description : +// * @Reference : +// * @Author : Castle +// * @CreateDate : 2022/1/7 13:38 +// * @Modify: +// **/ +//@Service +//public class CoreDataSourceDao implements ICoreDataSourceDao { +// @Autowired +// private EntityManager entityManager; +// +// @Override +// public List listAllIdAndSourceName() { +// String hql = "select new CoreDataSource(id,sourceName) from CoreDatasource where isDeleted = :is_deleted and isValid = :is_valid and organizeCode = :organizeCode and sourceStatus = :sourceStatus"; +// Query query = entityManager.createQuery(hql); +// query.setParameter("organizeCode", AuthUtil.getOrganize().getOrganizeCode()); +// query.setParameter("is_deleted", CommonEnumUtil.TRUE_OR_FALSE.FALSE.getValue()); +// query.setParameter("is_valid", CommonEnumUtil.TRUE_OR_FALSE.FALSE.getValue()); +// query.setParameter("sourceStatus", PlatformEnumUtil.DATA_SOURCE_STATUS.CONN_SUCCESS.getValue()); +// return query.getResultList(); +// } +//} diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/DataSeparatorQueueReceiver.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/DataSeparatorQueueReceiver.java index 93c38a3..c5bff08 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/DataSeparatorQueueReceiver.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/DataSeparatorQueueReceiver.java @@ -3,32 +3,23 @@ package cn.estsh.i3plus.core.apiservice.mq; import cn.estsh.i3plus.core.apiservice.serviceimpl.base.DataSeparatorServiceImpl; import cn.estsh.i3plus.platform.common.tool.JsonUtilTool; import cn.estsh.i3plus.platform.common.util.PlatformConstWords; -import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository; import cn.estsh.i3plus.pojo.base.jpa.daoimpl.BaseRepositoryImpl; import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage; -import cn.estsh.impp.framework.boot.util.SpringContextsUtil; -import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.aop.framework.AdvisedSupport; -import org.springframework.aop.framework.AopProxy; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.lang.reflect.Type; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.Objects; /** * @Description : @@ -62,56 +53,69 @@ public class DataSeparatorQueueReceiver { @RabbitListener(queues = PlatformConstWords.QUEUE_IMPP_DATA_SEPARATOR,concurrency = "5") public Boolean consumer(String data, Channel channel, Message message) { DataSeparatorMessage msg = JsonUtilTool.decode(data, DataSeparatorMessage.class); - String refClass = msg.getRefClass(); - BaseRepository repository; +// String refClass = msg.getRefClass(); +// BaseRepository repository; //找到项目中bean的repository - if (REPOSITORY_HASH_MAP.containsKey(refClass)) { - repository = REPOSITORY_HASH_MAP.get(refClass); - } else { - - Map beansRepositories = SpringContextsUtil.getBeansOfType(BaseRepository.class); - List collect = beansRepositories.values().stream().filter(item -> { - String clazz = null; - try { - //第一层代理 - Field h = item.getClass().getSuperclass().getDeclaredField("h"); - h.setAccessible(true); - AopProxy o = (AopProxy) h.get(item); - Field advised = o.getClass().getDeclaredField("advised"); - advised.setAccessible(true); - Object target = ((AdvisedSupport) advised.get(o)).getTargetSource().getTarget(); - - //第二层代理 - Field h1 = target.getClass().getSuperclass().getDeclaredField("h"); - h1.setAccessible(true); - AopProxy aopProxy = (AopProxy) h1.get(target); - Field advised1 = aopProxy.getClass().getDeclaredField("advised"); - advised1.setAccessible(true); - Object target1 = ((AdvisedSupport) advised1.get(aopProxy)).getTargetSource().getTarget(); - - Field persistentClass = target1.getClass().getDeclaredField("persistentClass"); - persistentClass.setAccessible(true); - clazz = persistentClass.get(target1).toString(); - } catch (NoSuchFieldException | IllegalAccessException e) { - return false; - } catch (Exception e) { - LOGGER.error("通用数据分离,获取repository异常 :{}", e.getMessage()); - } - - return clazz.substring(clazz.lastIndexOf(".") + 1).equals(refClass.substring(refClass.lastIndexOf(".") + 1)); - - }).collect(Collectors.toList()); - - if (!collect.isEmpty()) { - repository = collect.get(0); - } else { - LOGGER.error("{}:找不到对应的RepositoryBean!", refClass); - return false; - } - } - +// if (REPOSITORY_HASH_MAP.containsKey(refClass)) { +// repository = REPOSITORY_HASH_MAP.get(refClass); +// } else { +// +// Map beansRepositories = SpringContextsUtil.getBeansOfType(BaseRepository.class); +// List collect = beansRepositories.values().stream().filter(item -> { +// String clazz = null; +// try { +// //第一层代理 +// Field h = item.getClass().getSuperclass().getDeclaredField("h"); +// h.setAccessible(true); +// AopProxy o = (AopProxy) h.get(item); +// Field advised = o.getClass().getDeclaredField("advised"); +// advised.setAccessible(true); +// Object target = ((AdvisedSupport) advised.get(o)).getTargetSource().getTarget(); +// +// //第二层代理 +// Field h1 = target.getClass().getSuperclass().getDeclaredField("h"); +// h1.setAccessible(true); +// AopProxy aopProxy = (AopProxy) h1.get(target); +// Field advised1 = aopProxy.getClass().getDeclaredField("advised"); +// advised1.setAccessible(true); +// Object target1 = ((AdvisedSupport) advised1.get(aopProxy)).getTargetSource().getTarget(); +// +// Field persistentClass = target1.getClass().getDeclaredField("persistentClass"); +// persistentClass.setAccessible(true); +// clazz = persistentClass.get(target1).toString(); +// } catch (NoSuchFieldException | IllegalAccessException e) { +// return false; +// } catch (Exception e) { +// LOGGER.error("通用数据分离,获取repository异常 :{}", e.getMessage()); +// } +// +// return clazz.substring(clazz.lastIndexOf(".") + 1).equals(refClass.substring(refClass.lastIndexOf(".") + 1)); +// +// }).collect(Collectors.toList()); +// +// if (!collect.isEmpty()) { +// repository = collect.get(0); +// } else { +// LOGGER.error("{}:找不到对应的RepositoryBean!", refClass); +// return false; +// } +// } +// +// try { +// dataSeparatorService.doSeparate(repository, msg); +// } catch (Exception e) { +// LOGGER.error("通用冷热数据分离失败,失败原因:{}", e.getMessage()); +// } finally { +// try { +// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); +// } catch (IOException e) { +// LOGGER.error("通用冷热数据分离失败,返回ACK失败:{}", e.getMessage()); +// } +// } try { - dataSeparatorService.doSeparate(repository, msg); + if (!Objects.isNull(msg)){ + dataSeparatorService.doSeparate(msg); + } } catch (Exception e) { LOGGER.error("通用冷热数据分离失败,失败原因:{}", e.getMessage()); } finally { @@ -121,7 +125,6 @@ public class DataSeparatorQueueReceiver { LOGGER.error("通用冷热数据分离失败,返回ACK失败:{}", e.getMessage()); } } - return true; } } \ No newline at end of file diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/CoreDataSourceServiceImpl.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/CoreDataSourceServiceImpl.java index 269f491..57f96d9 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/CoreDataSourceServiceImpl.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/CoreDataSourceServiceImpl.java @@ -10,7 +10,6 @@ import cn.estsh.i3plus.pojo.base.bean.ListPager; import cn.estsh.i3plus.pojo.base.common.Pager; import cn.estsh.i3plus.pojo.base.common.PagerHelper; import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil; -import cn.estsh.i3plus.pojo.base.enumutil.WmsEnumUtil; import cn.estsh.i3plus.pojo.base.tool.DdlPreparedPack; import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource; import cn.estsh.i3plus.pojo.platform.repository.CoreDataSourceRepository; @@ -38,6 +37,7 @@ public class CoreDataSourceServiceImpl implements ICoreDataSourceService{ @Autowired private CoreJdbcTemplateConfig jdbcConfig; + @Override public CoreDataSource updateDataObject(CoreDataSource dataObject) { coreDataSourceRepository.update(dataObject); @@ -47,7 +47,6 @@ public class CoreDataSourceServiceImpl implements ICoreDataSourceService{ @Override public CoreDataSource saveDataSource(CoreDataSource source) { - ConvertBean.saveOrUpdate(source, AuthUtil.getSessionUser().getUserName()); coreDataSourceRepository.insert(source); if (jdbcConfig.checkConnection(source)) { source.setSourceStatus(PlatformEnumUtil.DATA_SOURCE_STATUS.CONN_SUCCESS.getValue()); @@ -65,7 +64,7 @@ public class CoreDataSourceServiceImpl implements ICoreDataSourceService{ @Override - public void checkBfDataSourceOnly(CoreDataSource source) { + public void checkDataSourceOnly(CoreDataSource source) { DdlPackBean ddlPackBean = DdlPackBean.getDdlPackBean(); DdlPreparedPack.getStringEqualPack(source.getSourceCode(), "sourceCode", ddlPackBean); int count = coreDataSourceRepository.findByHqlWhereCount(ddlPackBean); @@ -123,6 +122,15 @@ public class CoreDataSourceServiceImpl implements ICoreDataSourceService{ @Override public List list() { - return null; + DdlPackBean ddlPackBean = DdlPackBean.getDdlPackBean(); + DdlPreparedPack.getStringEqualPack(AuthUtil.getOrganizeCode(), "organizeCode", ddlPackBean); + DdlPreparedPack.getNumEqualPack(PlatformEnumUtil.DATA_SOURCE_STATUS.CONN_SUCCESS.getValue(), "sourceStatus", ddlPackBean); + return coreDataSourceRepository.findByHqlWhere(ddlPackBean); + } + + @Override + public void initCoreDataSource() { + List coreDataSourceList = coreDataSourceRepository.list(); + coreDataSourceList.forEach(source -> jdbcConfig.checkConnection(source)); } } diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/DataSeparatorServiceImpl.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/DataSeparatorServiceImpl.java index 0ed26ae..e0fd3d2 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/DataSeparatorServiceImpl.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/DataSeparatorServiceImpl.java @@ -10,7 +10,6 @@ import cn.estsh.i3plus.pojo.base.bean.DdlPackBean; import cn.estsh.i3plus.pojo.base.bean.ListPager; import cn.estsh.i3plus.pojo.base.common.Pager; import cn.estsh.i3plus.pojo.base.common.PagerHelper; -import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository; import cn.estsh.i3plus.pojo.base.tool.DdlPreparedPack; import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage; import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule; @@ -23,8 +22,11 @@ import com.alibaba.druid.sql.SQLUtils; import com.alibaba.druid.sql.ast.SQLStatement; import com.alibaba.druid.sql.visitor.ExportParameterizedOutputVisitor; import com.alibaba.druid.util.JdbcConstants; +import com.alibaba.fastjson.JSON; import com.google.common.base.Strings; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Service; @@ -32,6 +34,7 @@ import javax.annotation.Resource; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * @Description : @@ -41,6 +44,7 @@ import java.util.Map; * @Modify: **/ @Service +@Slf4j public class DataSeparatorServiceImpl implements IDataSeparatorService { @Resource(name = CommonConstWords.IMPP_REDIS_RES) @@ -56,28 +60,28 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService { private SeparatorDataUtil separatorDataUtil; @Override - public void doSeparate(BaseRepository baseRepository, DataSeparatorMessage msg) { - //获取数据 - Long id = msg.getId(); - Object bean = baseRepository.getById(id); + public void doSeparate(DataSeparatorMessage msg) { + //获取数据来源数据源 + CoreJdbcTemplate srcJdbcTemplate = coreJdbcTemplateConfig.getCoreJdbcTemplate(msg.getSrcDataSourceId()); //获取目的地的数据源 - CoreJdbcTemplate coreJdbcTemplate = coreJdbcTemplateConfig.getCoreJdbcTemplate(msg.getDestDataSourceId()); + CoreJdbcTemplate destJdbcTemplate = coreJdbcTemplateConfig.getCoreJdbcTemplate(msg.getDestDataSourceId()); + //迁移数据 try { - execute(bean, msg,coreJdbcTemplate); + execute(msg,srcJdbcTemplate,destJdbcTemplate); } catch (Exception e) { throw ImppExceptionBuilder.newInstance().setErrorDetail(e.getMessage()+":separate data failed").build(); } //删除原数据 - baseRepository.deleteById(id); +// baseRepository.deleteById(id); } @Override public boolean addRule(DataSeparatorRule msg) { ConvertBean.saveOrUpdate(msg, AuthUtil.getSessionUser().getUserName()); DataSeparatorRule rule = dataSeparatorRepository.insert(msg); - String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + rule.getRefBeanName().toUpperCase(); + String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + rule.getTableNameSrc().toUpperCase(); redisRes.putObject(redisKey, rule, -1); return true; } @@ -85,7 +89,7 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService { @Override public boolean deleteRule(Long id) { DataSeparatorRule rule = dataSeparatorRepository.getById(id); - String redisKey = CommonConstWords.RES_DATA_SEPARATOR + rule.getRefBeanName().toUpperCase(); + String redisKey = CommonConstWords.RES_DATA_SEPARATOR + rule.getTableNameSrc().toUpperCase(); dataSeparatorRepository.deleteWeaklyById(id, AuthUtil.getSessionUser().getUserName()); redisRes.deleteKey(redisKey); return true; @@ -95,7 +99,7 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService { public boolean update(DataSeparatorRule msg) { ConvertBean.saveOrUpdate(msg, AuthUtil.getSessionUser().getUserName()); dataSeparatorRepository.update(msg); - String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + msg.getRefBeanName().toUpperCase(); + String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + msg.getTableNameSrc().toUpperCase(); redisRes.putObject(redisKey, msg, -1); return true; } @@ -104,12 +108,26 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService { public ListPager query(DataSeparatorRule rule, Pager pager) { DdlPackBean ddlPackBean = DdlPackBean.getDdlPackBean(rule.getOrganizeCode()); - if (Strings.isNullOrEmpty(rule.getRefBeanName())) { - DdlPreparedPack.getStringLeftLikerPack(rule.getRefBeanName(), "refBeanName", ddlPackBean); + if (!Strings.isNullOrEmpty(rule.getTableNameSrc())) { + DdlPreparedPack.getStringRightLikerPack(rule.getTableNameSrc(), "tableNameSrc", ddlPackBean); } - - if (Strings.isNullOrEmpty(rule.getRuleColumn())) { - DdlPreparedPack.getStringLeftLikerPack(rule.getRuleColumn(), "ruleColumn", ddlPackBean); + if (!Strings.isNullOrEmpty(rule.getRuleColumn())) { + DdlPreparedPack.getStringRightLikerPack(rule.getRuleColumn(), "ruleColumn", ddlPackBean); + } + if (!Strings.isNullOrEmpty(rule.getRule())) { + DdlPreparedPack.getStringRightLikerPack(rule.getRule(), "rule", ddlPackBean); + } + if (!Objects.isNull(rule.getDestDataSourceId())) { + DdlPreparedPack.getNumEqualPack(rule.getDestDataSourceId(), "destDataSourceId", ddlPackBean); + } + if (!Objects.isNull(rule.getDestDataSourceId())) { + DdlPreparedPack.getNumEqualPack(rule.getSrcDataSourceId(), "srcDataSourceId", ddlPackBean); + } + if (!Strings.isNullOrEmpty(rule.getTableNameDest())) { + DdlPreparedPack.getStringRightLikerPack(rule.getTableNameDest(), "tableNameDest", ddlPackBean); + } + if (!Strings.isNullOrEmpty(rule.getSeparator())) { + DdlPreparedPack.getStringEqualPack(rule.getSeparator(), "separator", ddlPackBean); } pager = PagerHelper.getPager(pager, dataSeparatorRepository.findByHqlWhereCount(ddlPackBean)); @@ -120,17 +138,25 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService { public void separatorRedisInit() { //启动加载所有的数据到redis中 List list = dataSeparatorRepository.list(); - list.forEach(item -> { - String refBeanName = item.getRefBeanName().toUpperCase(); - String redisKey = CommonConstWords.RES_DATA_SEPARATOR + refBeanName; - redisRes.putObject(redisKey, item, -1); - }); + if (list.size() > 0){ + list.forEach(item -> { + String refBeanName = item.getTableNameSrc().toUpperCase(); + String redisKey = CommonConstWords.RES_DATA_SEPARATOR + refBeanName; + redisRes.putObject(redisKey, item, -1); + }); + } } - private void execute(Object bean,DataSeparatorMessage msg,CoreJdbcTemplate coreJdbcTemplate) throws Exception { - String insertSql = separatorDataUtil.packSql(bean, msg.getDestTableName()); - NamedParameterJdbcTemplate jdbcTemplate = coreJdbcTemplate.getNamedParameterJdbcTemplate(); + private void execute(DataSeparatorMessage msg,CoreJdbcTemplate srcJdbcTemplate,CoreJdbcTemplate descJdbcTemplate) throws Exception { + //1.查询数据 + String selectSql = "select * from " + msg.getSrcTableName() + " where id = " + msg.getId(); + HashMap paramMap = new HashMap<>(64); + Map resultMap = srcJdbcTemplate.getNamedParameterJdbcTemplate().getJdbcTemplate().queryForMap(selectSql); + paramMap.putAll(resultMap); + //2.迁移数据 + String insertSql = separatorDataUtil.packInsertSql(paramMap, msg.getDestTableName()); + NamedParameterJdbcTemplate jdbcTemplate = descJdbcTemplate.getNamedParameterJdbcTemplate(); boolean isSqlServer = isMSSQLSERVER(jdbcTemplate); DbType dbType; if (isSqlServer) { @@ -138,11 +164,22 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService { } else { dbType = JdbcConstants.MYSQL; } - Map resultMap = parseSQL(insertSql, dbType); - String preparedSql = resultMap.get("preparedSql").toString(); + Map tempResult = parseSQL(insertSql, dbType); + String preparedSql = tempResult.get("preparedSql").toString(); @SuppressWarnings("unchecked") - List parameters = (List) resultMap.get("parameters"); - jdbcTemplate.getJdbcTemplate().update(preparedSql,parameters.toArray()); + List parameters = (List) tempResult.get("parameters"); + try { + jdbcTemplate.getJdbcTemplate().update(preparedSql,parameters.toArray()); + } catch (DuplicateKeyException e) { + log.info("迁移数据已经保存到目标库中,数据为:{}", JSON.toJSONString(resultMap)); + //3.删除原数据 + String deleteSql = "delete from " + msg.getSrcTableName() + " where id = :id "; + srcJdbcTemplate.getNamedParameterJdbcTemplate().update(deleteSql,paramMap); + } + + //4.删除原数据 + String deleteSql = "delete from " + msg.getSrcTableName() + " where id = :id "; + srcJdbcTemplate.getNamedParameterJdbcTemplate().update(deleteSql,paramMap); } diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/SystemInitService.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/SystemInitService.java index 451b679..fbcb7f6 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/SystemInitService.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/base/SystemInitService.java @@ -1,5 +1,7 @@ package cn.estsh.i3plus.core.apiservice.serviceimpl.base; +import cn.estsh.i3plus.core.api.iservice.base.ICoreDataSourceService; +import cn.estsh.i3plus.core.api.iservice.base.IDataSeparatorService; import cn.estsh.i3plus.core.api.iservice.base.ISystemInitService; import cn.estsh.i3plus.core.api.iservice.busi.*; import cn.estsh.i3plus.core.apiservice.controller.busi.SysSmsSendRecordController; @@ -76,6 +78,12 @@ public class SystemInitService implements ISystemInitService { @Autowired private ISysSmsSendRecordService sysSmsSendRecordService; + @Autowired + private IDataSeparatorService dataSeparatorService; + + @Autowired + private ICoreDataSourceService iCoreDataSourceService; + @Resource(name= CommonConstWords.IMPP_REDIS_RES) private ImppRedis redisRes; @@ -119,6 +127,14 @@ public class SystemInitService implements ISystemInitService { LOGGER.info("创建 短信同步线程"); createSmsSyncThread(); LOGGER.info("创建 短信同步线程 完成"); + + LOGGER.info("【通用数据冷热分离初始化开始...】"); + dataSeparatorService.separatorRedisInit(); + LOGGER.info("【通用数据冷热分离初始化结束...】"); + + LOGGER.info("【通用数据冷热分离初始化数据源开始...】"); + iCoreDataSourceService.initCoreDataSource(); + LOGGER.info("【通用数据冷热分离初始化数据源结束...】"); }catch (Exception e){ e.printStackTrace(); } diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/util/SeparatorDataUtil.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/util/SeparatorDataUtil.java index 7e0d4b7..d032351 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/util/SeparatorDataUtil.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/util/SeparatorDataUtil.java @@ -31,8 +31,7 @@ public class SeparatorDataUtil { * @param destTableName * @return */ - public String packSql(Object bean, String destTableName) { - HashMap objMap = getObjMap(bean); + public String packInsertSql(HashMap objMap, String destTableName) { String insertSQL = baseSqlService.packInsertSQL(destTableName, objMap); return insertSQL; } diff --git a/pom.xml b/pom.xml index 5370878..6bafe83 100644 --- a/pom.xml +++ b/pom.xml @@ -170,11 +170,11 @@ ${project.dependency.version} - - i3plus.platform - i3plus-platform-plugin - ${project.dependency.version} - + + + + + i3plus.pojo i3plus-pojo-platform @@ -191,11 +191,11 @@ ${project.dependency.version} - - i3plus.pojo - i3plus-pojo-mes - ${project.dependency.version} - + + + + +