冷热数据分离--重构--测试完成

yun-zuoyi
castle.zang 3 years ago
parent 7976427f42
commit 417679d1af

@ -50,7 +50,7 @@ public interface ICoreDataSourceService {
* @param source * @param source
*/ */
@ApiOperation(value = "数据源唯一检查") @ApiOperation(value = "数据源唯一检查")
void checkBfDataSourceOnly(CoreDataSource source); void checkDataSourceOnly(CoreDataSource source);
/** /**
@ -82,4 +82,8 @@ public interface ICoreDataSourceService {
*/ */
List<CoreDataSource> list(); List<CoreDataSource> list();
/**
*
*/
void initCoreDataSource();
} }

@ -2,7 +2,6 @@ package cn.estsh.i3plus.core.api.iservice.base;
import cn.estsh.i3plus.pojo.base.bean.ListPager; import cn.estsh.i3plus.pojo.base.bean.ListPager;
import cn.estsh.i3plus.pojo.base.common.Pager; import cn.estsh.i3plus.pojo.base.common.Pager;
import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage; import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule; import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule;
@ -17,10 +16,9 @@ public interface IDataSeparatorService {
/** /**
* *
* @param baseRepository
* @param msg * @param msg
*/ */
void doSeparate(BaseRepository baseRepository, DataSeparatorMessage msg); void doSeparate(DataSeparatorMessage msg);
/** /**
* *

@ -1,6 +1,6 @@
package cn.estsh.i3plus.core.apiservice.configuration; package cn.estsh.i3plus.core.apiservice.configuration;
import cn.estsh.i3plus.pojo.wms.bean.datasource.CusDatasource; import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource;
import cn.estsh.impp.framework.boot.exception.ImppExceptionBuilder; import cn.estsh.impp.framework.boot.exception.ImppExceptionBuilder;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -18,7 +18,7 @@ import java.sql.*;
@Slf4j @Slf4j
@Data @Data
public class CoreJdbcTemplate { public class CoreJdbcTemplate {
private CusDatasource source; private CoreDataSource source;
private NamedParameterJdbcTemplate coreJdbcTemplate; private NamedParameterJdbcTemplate coreJdbcTemplate;

@ -5,7 +5,6 @@ import cn.estsh.i3plus.platform.common.exception.ImppExceptionEnum;
import cn.estsh.i3plus.platform.common.util.CommonConstWords; import cn.estsh.i3plus.platform.common.util.CommonConstWords;
import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil; import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil;
import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource; import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource;
import cn.estsh.i3plus.pojo.wms.bean.datasource.CusDatasource;
import cn.estsh.impp.framework.boot.exception.ImppExceptionBuilder; import cn.estsh.impp.framework.boot.exception.ImppExceptionBuilder;
import cn.estsh.impp.framework.boot.util.ImppRedis; import cn.estsh.impp.framework.boot.util.ImppRedis;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
@ -65,7 +64,7 @@ public class CoreJdbcTemplateConfig {
private AnnotationConfigServletWebServerApplicationContext context; private AnnotationConfigServletWebServerApplicationContext context;
@Resource(name = CommonConstWords.IMPP_REDIS_WMS) @Resource(name = CommonConstWords.IMPP_REDIS_RES)
private ImppRedis redis; private ImppRedis redis;
@Autowired @Autowired
@ -79,15 +78,15 @@ public class CoreJdbcTemplateConfig {
removeJdbcTemplateBean(coreDatasource); removeJdbcTemplateBean(coreDatasource);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Error occurred when removing the older cusDataSource", e); log.error("Error occurred when removing the older coreDataSource", e);
} }
log.info("Spring Ioc Init CusDatasource to Ioc:{}", coreDatasource); log.info("Spring Ioc Init coreDataSource to Ioc:{}", coreDatasource);
NamedParameterJdbcTemplate namedParameterJdbcTemplate = getNamedParameterJdbcTemplate(coreDatasource); NamedParameterJdbcTemplate namedParameterJdbcTemplate = getNamedParameterJdbcTemplate(coreDatasource);
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(CoreJdbcTemplate.class); BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(CoreJdbcTemplate.class);
beanDefinitionBuilder.addPropertyValue("source", coreDatasource); beanDefinitionBuilder.addPropertyValue("source", coreDatasource);
beanDefinitionBuilder.addPropertyValue("wmsJdbcTemplate", namedParameterJdbcTemplate); beanDefinitionBuilder.addPropertyValue("coreJdbcTemplate", namedParameterJdbcTemplate);
//存入redis //存入redis
redis.putObject(CORE_CACHE_POOL_PREFIX + coreDatasource.getId(), coreDatasource); redis.putObject(CORE_CACHE_POOL_PREFIX + coreDatasource.getId(), coreDatasource);
@ -100,7 +99,7 @@ public class CoreJdbcTemplateConfig {
} }
/** /**
* wmsredis * coreredis
* *
* @param key * @param key
* @return * @return
@ -156,16 +155,16 @@ public class CoreJdbcTemplateConfig {
* @param cusDataSource * @param cusDataSource
* @return * @return
*/ */
private NamedParameterJdbcTemplate getNamedParameterJdbcTemplate(CoreDataSource cusDataSource) { private NamedParameterJdbcTemplate getNamedParameterJdbcTemplate(CoreDataSource coreDataSource) {
log.info("【数据源】创建数据源cusDatasource{}", cusDataSource); log.info("【数据源】创建数据源coreDataSource{}", coreDataSource);
CommonEnumUtil.DATA_SOURCE_TYPE type = CommonEnumUtil.DATA_SOURCE_TYPE.valueOf(cusDataSource.getSourceType()); CommonEnumUtil.DATA_SOURCE_TYPE type = CommonEnumUtil.DATA_SOURCE_TYPE.valueOf(coreDataSource.getSourceType());
DataSourceBuilder<?> dataSourceBuilder = DataSourceBuilder.create(); DataSourceBuilder<?> dataSourceBuilder = DataSourceBuilder.create();
dataSourceBuilder.type(HikariDataSource.class); dataSourceBuilder.type(HikariDataSource.class);
dataSourceBuilder.driverClassName(type.getDriverClassName()); dataSourceBuilder.driverClassName(type.getDriverClassName());
dataSourceBuilder.url(type.getJDBCUrl(cusDataSource.getSourceDataBaseName(), cusDataSource.getSourceHost(), cusDataSource.getSourcePort())); dataSourceBuilder.url(type.getJDBCUrl(coreDataSource.getSourceDataBaseName(), coreDataSource.getSourceHost(), coreDataSource.getSourcePort()));
dataSourceBuilder.username(cusDataSource.getSourceUserName()); dataSourceBuilder.username(coreDataSource.getSourceUserName());
dataSourceBuilder.password(cusDataSource.getSourcePassword()); dataSourceBuilder.password(coreDataSource.getSourcePassword());
HikariDataSource dataSource = (HikariDataSource) dataSourceBuilder.build(); HikariDataSource dataSource = (HikariDataSource) dataSourceBuilder.build();
//连接池大小 //连接池大小

@ -61,7 +61,7 @@ public class DataSourceController {
.notNull("sourcePassword", coreDatasource.getSourcePassword()) .notNull("sourcePassword", coreDatasource.getSourcePassword())
.notNull("sourceDataBaseName", coreDatasource.getSourceDataBaseName()); .notNull("sourceDataBaseName", coreDatasource.getSourceDataBaseName());
ConvertBean.modelInitialize(coreDatasource, AuthUtil.getSessionUser()); ConvertBean.modelInitialize(coreDatasource, AuthUtil.getSessionUser());
datasourceService.checkBfDataSourceOnly(coreDatasource); datasourceService.checkDataSourceOnly(coreDatasource);
coreDatasource = datasourceService.saveDataSource(coreDatasource); coreDatasource = datasourceService.saveDataSource(coreDatasource);
return ResultBean.success("操作成功").setCode(ResourceEnumUtil.MESSAGE.SUCCESS.getCode()).setResultObject(coreDatasource); return ResultBean.success("操作成功").setCode(ResourceEnumUtil.MESSAGE.SUCCESS.getCode()).setResultObject(coreDatasource);
} catch (ImppBusiException exception) { } catch (ImppBusiException exception) {

@ -1,6 +1,7 @@
package cn.estsh.i3plus.core.apiservice.dao; package cn.estsh.i3plus.core.apiservice.dao;
import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource; import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource;
import org.springframework.stereotype.Repository;
import java.util.List; import java.util.List;
@ -11,6 +12,7 @@ import java.util.List;
* @CreateDate : 2022/1/7 13:36 * @CreateDate : 2022/1/7 13:36
* @Modify: * @Modify:
**/ **/
@Repository
public interface ICoreDataSourceDao { public interface ICoreDataSourceDao {
/** /**

@ -1,36 +1,37 @@
package cn.estsh.i3plus.core.apiservice.daoimpl; //package cn.estsh.i3plus.core.apiservice.daoimpl;
//
import cn.estsh.i3plus.core.apiservice.dao.ICoreDataSourceDao; //import cn.estsh.i3plus.core.apiservice.dao.ICoreDataSourceDao;
import cn.estsh.i3plus.core.apiservice.util.PlatformEnumUtil; //import cn.estsh.i3plus.core.apiservice.util.PlatformEnumUtil;
import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil; //import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil;
import cn.estsh.i3plus.pojo.base.enumutil.WmsEnumUtil; //import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource;
import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource; //import cn.estsh.impp.framework.boot.auth.AuthUtil;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Service;
//
import javax.persistence.EntityManager; //import javax.persistence.EntityManager;
import javax.persistence.Query; //import javax.persistence.Query;
import java.util.List; //import java.util.List;
//
/** ///**
* @Description : // * @Description :
* @Reference : // * @Reference :
* @Author : Castle // * @Author : Castle
* @CreateDate : 2022/1/7 13:38 // * @CreateDate : 2022/1/7 13:38
* @Modify: // * @Modify:
**/ // **/
@Component //@Service
public class CoreDataSourceDao implements ICoreDataSourceDao { //public class CoreDataSourceDao implements ICoreDataSourceDao {
@Autowired // @Autowired
private EntityManager entityManager; // private EntityManager entityManager;
//
@Override // @Override
public List<CoreDataSource> listAllIdAndSourceName() { // public List<CoreDataSource> listAllIdAndSourceName() {
String hql = "select new CusDatasource(id,sourceName) from CoreDatasource where isDeleted = :is_deleted and isValid = :is_valid and sourceStatus = :sourceStatus"; // String hql = "select new CoreDataSource(id,sourceName) from CoreDatasource where isDeleted = :is_deleted and isValid = :is_valid and organizeCode = :organizeCode and sourceStatus = :sourceStatus";
Query query = entityManager.createQuery(hql); // Query query = entityManager.createQuery(hql);
query.setParameter("is_deleted", CommonEnumUtil.TRUE_OR_FALSE.FALSE.getValue()); // query.setParameter("organizeCode", AuthUtil.getOrganize().getOrganizeCode());
query.setParameter("is_valid", CommonEnumUtil.TRUE_OR_FALSE.FALSE.getValue()); // query.setParameter("is_deleted", CommonEnumUtil.TRUE_OR_FALSE.FALSE.getValue());
query.setParameter("sourceStatus", PlatformEnumUtil.DATA_SOURCE_STATUS.CONN_SUCCESS.getValue()); // query.setParameter("is_valid", CommonEnumUtil.TRUE_OR_FALSE.FALSE.getValue());
return query.getResultList(); // query.setParameter("sourceStatus", PlatformEnumUtil.DATA_SOURCE_STATUS.CONN_SUCCESS.getValue());
} // return query.getResultList();
} // }
//}

@ -3,32 +3,23 @@ package cn.estsh.i3plus.core.apiservice.mq;
import cn.estsh.i3plus.core.apiservice.serviceimpl.base.DataSeparatorServiceImpl; import cn.estsh.i3plus.core.apiservice.serviceimpl.base.DataSeparatorServiceImpl;
import cn.estsh.i3plus.platform.common.tool.JsonUtilTool; import cn.estsh.i3plus.platform.common.tool.JsonUtilTool;
import cn.estsh.i3plus.platform.common.util.PlatformConstWords; import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository;
import cn.estsh.i3plus.pojo.base.jpa.daoimpl.BaseRepositoryImpl; import cn.estsh.i3plus.pojo.base.jpa.daoimpl.BaseRepositoryImpl;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage; import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage;
import cn.estsh.impp.framework.boot.util.SpringContextsUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.aop.framework.AdvisedSupport;
import org.springframework.aop.framework.AopProxy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.Objects;
/** /**
* @Description : * @Description :
@ -62,56 +53,69 @@ public class DataSeparatorQueueReceiver {
@RabbitListener(queues = PlatformConstWords.QUEUE_IMPP_DATA_SEPARATOR,concurrency = "5") @RabbitListener(queues = PlatformConstWords.QUEUE_IMPP_DATA_SEPARATOR,concurrency = "5")
public Boolean consumer(String data, Channel channel, Message message) { public Boolean consumer(String data, Channel channel, Message message) {
DataSeparatorMessage msg = JsonUtilTool.decode(data, DataSeparatorMessage.class); DataSeparatorMessage msg = JsonUtilTool.decode(data, DataSeparatorMessage.class);
String refClass = msg.getRefClass(); // String refClass = msg.getRefClass();
BaseRepository repository; // BaseRepository repository;
//找到项目中bean的repository //找到项目中bean的repository
if (REPOSITORY_HASH_MAP.containsKey(refClass)) { // if (REPOSITORY_HASH_MAP.containsKey(refClass)) {
repository = REPOSITORY_HASH_MAP.get(refClass); // repository = REPOSITORY_HASH_MAP.get(refClass);
} else { // } else {
//
Map<String, BaseRepository> beansRepositories = SpringContextsUtil.getBeansOfType(BaseRepository.class); // Map<String, BaseRepository> beansRepositories = SpringContextsUtil.getBeansOfType(BaseRepository.class);
List<BaseRepository> collect = beansRepositories.values().stream().filter(item -> { // List<BaseRepository> collect = beansRepositories.values().stream().filter(item -> {
String clazz = null; // String clazz = null;
try { // try {
//第一层代理 // //第一层代理
Field h = item.getClass().getSuperclass().getDeclaredField("h"); // Field h = item.getClass().getSuperclass().getDeclaredField("h");
h.setAccessible(true); // h.setAccessible(true);
AopProxy o = (AopProxy) h.get(item); // AopProxy o = (AopProxy) h.get(item);
Field advised = o.getClass().getDeclaredField("advised"); // Field advised = o.getClass().getDeclaredField("advised");
advised.setAccessible(true); // advised.setAccessible(true);
Object target = ((AdvisedSupport) advised.get(o)).getTargetSource().getTarget(); // Object target = ((AdvisedSupport) advised.get(o)).getTargetSource().getTarget();
//
//第二层代理 // //第二层代理
Field h1 = target.getClass().getSuperclass().getDeclaredField("h"); // Field h1 = target.getClass().getSuperclass().getDeclaredField("h");
h1.setAccessible(true); // h1.setAccessible(true);
AopProxy aopProxy = (AopProxy) h1.get(target); // AopProxy aopProxy = (AopProxy) h1.get(target);
Field advised1 = aopProxy.getClass().getDeclaredField("advised"); // Field advised1 = aopProxy.getClass().getDeclaredField("advised");
advised1.setAccessible(true); // advised1.setAccessible(true);
Object target1 = ((AdvisedSupport) advised1.get(aopProxy)).getTargetSource().getTarget(); // Object target1 = ((AdvisedSupport) advised1.get(aopProxy)).getTargetSource().getTarget();
//
Field persistentClass = target1.getClass().getDeclaredField("persistentClass"); // Field persistentClass = target1.getClass().getDeclaredField("persistentClass");
persistentClass.setAccessible(true); // persistentClass.setAccessible(true);
clazz = persistentClass.get(target1).toString(); // clazz = persistentClass.get(target1).toString();
} catch (NoSuchFieldException | IllegalAccessException e) { // } catch (NoSuchFieldException | IllegalAccessException e) {
return false; // return false;
} catch (Exception e) { // } catch (Exception e) {
LOGGER.error("通用数据分离,获取repository异常 :{}", e.getMessage()); // LOGGER.error("通用数据分离,获取repository异常 :{}", e.getMessage());
} // }
//
return clazz.substring(clazz.lastIndexOf(".") + 1).equals(refClass.substring(refClass.lastIndexOf(".") + 1)); // return clazz.substring(clazz.lastIndexOf(".") + 1).equals(refClass.substring(refClass.lastIndexOf(".") + 1));
//
}).collect(Collectors.toList()); // }).collect(Collectors.toList());
//
if (!collect.isEmpty()) { // if (!collect.isEmpty()) {
repository = collect.get(0); // repository = collect.get(0);
} else { // } else {
LOGGER.error("{}:找不到对应的RepositoryBean!", refClass); // LOGGER.error("{}:找不到对应的RepositoryBean!", refClass);
return false; // return false;
} // }
} // }
//
// try {
// dataSeparatorService.doSeparate(repository, msg);
// } catch (Exception e) {
// LOGGER.error("通用冷热数据分离失败,失败原因:{}", e.getMessage());
// } finally {
// try {
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// } catch (IOException e) {
// LOGGER.error("通用冷热数据分离失败返回ACK失败:{}", e.getMessage());
// }
// }
try { try {
dataSeparatorService.doSeparate(repository, msg); if (!Objects.isNull(msg)){
dataSeparatorService.doSeparate(msg);
}
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("通用冷热数据分离失败,失败原因:{}", e.getMessage()); LOGGER.error("通用冷热数据分离失败,失败原因:{}", e.getMessage());
} finally { } finally {
@ -121,7 +125,6 @@ public class DataSeparatorQueueReceiver {
LOGGER.error("通用冷热数据分离失败返回ACK失败:{}", e.getMessage()); LOGGER.error("通用冷热数据分离失败返回ACK失败:{}", e.getMessage());
} }
} }
return true; return true;
} }
} }

@ -10,7 +10,6 @@ import cn.estsh.i3plus.pojo.base.bean.ListPager;
import cn.estsh.i3plus.pojo.base.common.Pager; import cn.estsh.i3plus.pojo.base.common.Pager;
import cn.estsh.i3plus.pojo.base.common.PagerHelper; import cn.estsh.i3plus.pojo.base.common.PagerHelper;
import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil; import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil;
import cn.estsh.i3plus.pojo.base.enumutil.WmsEnumUtil;
import cn.estsh.i3plus.pojo.base.tool.DdlPreparedPack; import cn.estsh.i3plus.pojo.base.tool.DdlPreparedPack;
import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource; import cn.estsh.i3plus.pojo.platform.bean.CoreDataSource;
import cn.estsh.i3plus.pojo.platform.repository.CoreDataSourceRepository; import cn.estsh.i3plus.pojo.platform.repository.CoreDataSourceRepository;
@ -38,6 +37,7 @@ public class CoreDataSourceServiceImpl implements ICoreDataSourceService{
@Autowired @Autowired
private CoreJdbcTemplateConfig jdbcConfig; private CoreJdbcTemplateConfig jdbcConfig;
@Override @Override
public CoreDataSource updateDataObject(CoreDataSource dataObject) { public CoreDataSource updateDataObject(CoreDataSource dataObject) {
coreDataSourceRepository.update(dataObject); coreDataSourceRepository.update(dataObject);
@ -47,7 +47,6 @@ public class CoreDataSourceServiceImpl implements ICoreDataSourceService{
@Override @Override
public CoreDataSource saveDataSource(CoreDataSource source) { public CoreDataSource saveDataSource(CoreDataSource source) {
ConvertBean.saveOrUpdate(source, AuthUtil.getSessionUser().getUserName());
coreDataSourceRepository.insert(source); coreDataSourceRepository.insert(source);
if (jdbcConfig.checkConnection(source)) { if (jdbcConfig.checkConnection(source)) {
source.setSourceStatus(PlatformEnumUtil.DATA_SOURCE_STATUS.CONN_SUCCESS.getValue()); source.setSourceStatus(PlatformEnumUtil.DATA_SOURCE_STATUS.CONN_SUCCESS.getValue());
@ -65,7 +64,7 @@ public class CoreDataSourceServiceImpl implements ICoreDataSourceService{
@Override @Override
public void checkBfDataSourceOnly(CoreDataSource source) { public void checkDataSourceOnly(CoreDataSource source) {
DdlPackBean ddlPackBean = DdlPackBean.getDdlPackBean(); DdlPackBean ddlPackBean = DdlPackBean.getDdlPackBean();
DdlPreparedPack.getStringEqualPack(source.getSourceCode(), "sourceCode", ddlPackBean); DdlPreparedPack.getStringEqualPack(source.getSourceCode(), "sourceCode", ddlPackBean);
int count = coreDataSourceRepository.findByHqlWhereCount(ddlPackBean); int count = coreDataSourceRepository.findByHqlWhereCount(ddlPackBean);
@ -123,6 +122,15 @@ public class CoreDataSourceServiceImpl implements ICoreDataSourceService{
@Override @Override
public List<CoreDataSource> list() { public List<CoreDataSource> list() {
return null; DdlPackBean ddlPackBean = DdlPackBean.getDdlPackBean();
DdlPreparedPack.getStringEqualPack(AuthUtil.getOrganizeCode(), "organizeCode", ddlPackBean);
DdlPreparedPack.getNumEqualPack(PlatformEnumUtil.DATA_SOURCE_STATUS.CONN_SUCCESS.getValue(), "sourceStatus", ddlPackBean);
return coreDataSourceRepository.findByHqlWhere(ddlPackBean);
}
@Override
public void initCoreDataSource() {
List<CoreDataSource> coreDataSourceList = coreDataSourceRepository.list();
coreDataSourceList.forEach(source -> jdbcConfig.checkConnection(source));
} }
} }

@ -10,7 +10,6 @@ import cn.estsh.i3plus.pojo.base.bean.DdlPackBean;
import cn.estsh.i3plus.pojo.base.bean.ListPager; import cn.estsh.i3plus.pojo.base.bean.ListPager;
import cn.estsh.i3plus.pojo.base.common.Pager; import cn.estsh.i3plus.pojo.base.common.Pager;
import cn.estsh.i3plus.pojo.base.common.PagerHelper; import cn.estsh.i3plus.pojo.base.common.PagerHelper;
import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository;
import cn.estsh.i3plus.pojo.base.tool.DdlPreparedPack; import cn.estsh.i3plus.pojo.base.tool.DdlPreparedPack;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage; import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule; import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule;
@ -23,8 +22,11 @@ import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement; import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.visitor.ExportParameterizedOutputVisitor; import com.alibaba.druid.sql.visitor.ExportParameterizedOutputVisitor;
import com.alibaba.druid.util.JdbcConstants; import com.alibaba.druid.util.JdbcConstants;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -32,6 +34,7 @@ import javax.annotation.Resource;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
/** /**
* @Description : * @Description :
@ -41,6 +44,7 @@ import java.util.Map;
* @Modify: * @Modify:
**/ **/
@Service @Service
@Slf4j
public class DataSeparatorServiceImpl implements IDataSeparatorService { public class DataSeparatorServiceImpl implements IDataSeparatorService {
@Resource(name = CommonConstWords.IMPP_REDIS_RES) @Resource(name = CommonConstWords.IMPP_REDIS_RES)
@ -56,28 +60,28 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService {
private SeparatorDataUtil separatorDataUtil; private SeparatorDataUtil separatorDataUtil;
@Override @Override
public void doSeparate(BaseRepository baseRepository, DataSeparatorMessage msg) { public void doSeparate(DataSeparatorMessage msg) {
//获取数据 //获取数据来源数据源
Long id = msg.getId(); CoreJdbcTemplate srcJdbcTemplate = coreJdbcTemplateConfig.getCoreJdbcTemplate(msg.getSrcDataSourceId());
Object bean = baseRepository.getById(id);
//获取目的地的数据源 //获取目的地的数据源
CoreJdbcTemplate coreJdbcTemplate = coreJdbcTemplateConfig.getCoreJdbcTemplate(msg.getDestDataSourceId()); CoreJdbcTemplate destJdbcTemplate = coreJdbcTemplateConfig.getCoreJdbcTemplate(msg.getDestDataSourceId());
//迁移数据 //迁移数据
try { try {
execute(bean, msg,coreJdbcTemplate); execute(msg,srcJdbcTemplate,destJdbcTemplate);
} catch (Exception e) { } catch (Exception e) {
throw ImppExceptionBuilder.newInstance().setErrorDetail(e.getMessage()+":separate data failed").build(); throw ImppExceptionBuilder.newInstance().setErrorDetail(e.getMessage()+":separate data failed").build();
} }
//删除原数据 //删除原数据
baseRepository.deleteById(id); // baseRepository.deleteById(id);
} }
@Override @Override
public boolean addRule(DataSeparatorRule msg) { public boolean addRule(DataSeparatorRule msg) {
ConvertBean.saveOrUpdate(msg, AuthUtil.getSessionUser().getUserName()); ConvertBean.saveOrUpdate(msg, AuthUtil.getSessionUser().getUserName());
DataSeparatorRule rule = dataSeparatorRepository.insert(msg); DataSeparatorRule rule = dataSeparatorRepository.insert(msg);
String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + rule.getRefBeanName().toUpperCase(); String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + rule.getTableNameSrc().toUpperCase();
redisRes.putObject(redisKey, rule, -1); redisRes.putObject(redisKey, rule, -1);
return true; return true;
} }
@ -85,7 +89,7 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService {
@Override @Override
public boolean deleteRule(Long id) { public boolean deleteRule(Long id) {
DataSeparatorRule rule = dataSeparatorRepository.getById(id); DataSeparatorRule rule = dataSeparatorRepository.getById(id);
String redisKey = CommonConstWords.RES_DATA_SEPARATOR + rule.getRefBeanName().toUpperCase(); String redisKey = CommonConstWords.RES_DATA_SEPARATOR + rule.getTableNameSrc().toUpperCase();
dataSeparatorRepository.deleteWeaklyById(id, AuthUtil.getSessionUser().getUserName()); dataSeparatorRepository.deleteWeaklyById(id, AuthUtil.getSessionUser().getUserName());
redisRes.deleteKey(redisKey); redisRes.deleteKey(redisKey);
return true; return true;
@ -95,7 +99,7 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService {
public boolean update(DataSeparatorRule msg) { public boolean update(DataSeparatorRule msg) {
ConvertBean.saveOrUpdate(msg, AuthUtil.getSessionUser().getUserName()); ConvertBean.saveOrUpdate(msg, AuthUtil.getSessionUser().getUserName());
dataSeparatorRepository.update(msg); dataSeparatorRepository.update(msg);
String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + msg.getRefBeanName().toUpperCase(); String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + msg.getTableNameSrc().toUpperCase();
redisRes.putObject(redisKey, msg, -1); redisRes.putObject(redisKey, msg, -1);
return true; return true;
} }
@ -104,12 +108,26 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService {
public ListPager<DataSeparatorRule> query(DataSeparatorRule rule, Pager pager) { public ListPager<DataSeparatorRule> query(DataSeparatorRule rule, Pager pager) {
DdlPackBean ddlPackBean = DdlPackBean.getDdlPackBean(rule.getOrganizeCode()); DdlPackBean ddlPackBean = DdlPackBean.getDdlPackBean(rule.getOrganizeCode());
if (Strings.isNullOrEmpty(rule.getRefBeanName())) { if (!Strings.isNullOrEmpty(rule.getTableNameSrc())) {
DdlPreparedPack.getStringLeftLikerPack(rule.getRefBeanName(), "refBeanName", ddlPackBean); DdlPreparedPack.getStringRightLikerPack(rule.getTableNameSrc(), "tableNameSrc", ddlPackBean);
} }
if (!Strings.isNullOrEmpty(rule.getRuleColumn())) {
if (Strings.isNullOrEmpty(rule.getRuleColumn())) { DdlPreparedPack.getStringRightLikerPack(rule.getRuleColumn(), "ruleColumn", ddlPackBean);
DdlPreparedPack.getStringLeftLikerPack(rule.getRuleColumn(), "ruleColumn", ddlPackBean); }
if (!Strings.isNullOrEmpty(rule.getRule())) {
DdlPreparedPack.getStringRightLikerPack(rule.getRule(), "rule", ddlPackBean);
}
if (!Objects.isNull(rule.getDestDataSourceId())) {
DdlPreparedPack.getNumEqualPack(rule.getDestDataSourceId(), "destDataSourceId", ddlPackBean);
}
if (!Objects.isNull(rule.getDestDataSourceId())) {
DdlPreparedPack.getNumEqualPack(rule.getSrcDataSourceId(), "srcDataSourceId", ddlPackBean);
}
if (!Strings.isNullOrEmpty(rule.getTableNameDest())) {
DdlPreparedPack.getStringRightLikerPack(rule.getTableNameDest(), "tableNameDest", ddlPackBean);
}
if (!Strings.isNullOrEmpty(rule.getSeparator())) {
DdlPreparedPack.getStringEqualPack(rule.getSeparator(), "separator", ddlPackBean);
} }
pager = PagerHelper.getPager(pager, dataSeparatorRepository.findByHqlWhereCount(ddlPackBean)); pager = PagerHelper.getPager(pager, dataSeparatorRepository.findByHqlWhereCount(ddlPackBean));
@ -120,17 +138,25 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService {
public void separatorRedisInit() { public void separatorRedisInit() {
//启动加载所有的数据到redis中 //启动加载所有的数据到redis中
List<DataSeparatorRule> list = dataSeparatorRepository.list(); List<DataSeparatorRule> list = dataSeparatorRepository.list();
list.forEach(item -> { if (list.size() > 0){
String refBeanName = item.getRefBeanName().toUpperCase(); list.forEach(item -> {
String redisKey = CommonConstWords.RES_DATA_SEPARATOR + refBeanName; String refBeanName = item.getTableNameSrc().toUpperCase();
redisRes.putObject(redisKey, item, -1); String redisKey = CommonConstWords.RES_DATA_SEPARATOR + refBeanName;
}); redisRes.putObject(redisKey, item, -1);
});
}
} }
private void execute(Object bean,DataSeparatorMessage msg,CoreJdbcTemplate coreJdbcTemplate) throws Exception { private void execute(DataSeparatorMessage msg,CoreJdbcTemplate srcJdbcTemplate,CoreJdbcTemplate descJdbcTemplate) throws Exception {
String insertSql = separatorDataUtil.packSql(bean, msg.getDestTableName()); //1.查询数据
NamedParameterJdbcTemplate jdbcTemplate = coreJdbcTemplate.getNamedParameterJdbcTemplate(); String selectSql = "select * from " + msg.getSrcTableName() + " where id = " + msg.getId();
HashMap<String, Object> paramMap = new HashMap<>(64);
Map<String, Object> resultMap = srcJdbcTemplate.getNamedParameterJdbcTemplate().getJdbcTemplate().queryForMap(selectSql);
paramMap.putAll(resultMap);
//2.迁移数据
String insertSql = separatorDataUtil.packInsertSql(paramMap, msg.getDestTableName());
NamedParameterJdbcTemplate jdbcTemplate = descJdbcTemplate.getNamedParameterJdbcTemplate();
boolean isSqlServer = isMSSQLSERVER(jdbcTemplate); boolean isSqlServer = isMSSQLSERVER(jdbcTemplate);
DbType dbType; DbType dbType;
if (isSqlServer) { if (isSqlServer) {
@ -138,11 +164,22 @@ public class DataSeparatorServiceImpl implements IDataSeparatorService {
} else { } else {
dbType = JdbcConstants.MYSQL; dbType = JdbcConstants.MYSQL;
} }
Map<String, Object> resultMap = parseSQL(insertSql, dbType); Map<String, Object> tempResult = parseSQL(insertSql, dbType);
String preparedSql = resultMap.get("preparedSql").toString(); String preparedSql = tempResult.get("preparedSql").toString();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
List<Object> parameters = (List<Object>) resultMap.get("parameters"); List<Object> parameters = (List<Object>) tempResult.get("parameters");
jdbcTemplate.getJdbcTemplate().update(preparedSql,parameters.toArray()); try {
jdbcTemplate.getJdbcTemplate().update(preparedSql,parameters.toArray());
} catch (DuplicateKeyException e) {
log.info("迁移数据已经保存到目标库中,数据为:{}", JSON.toJSONString(resultMap));
//3.删除原数据
String deleteSql = "delete from " + msg.getSrcTableName() + " where id = :id ";
srcJdbcTemplate.getNamedParameterJdbcTemplate().update(deleteSql,paramMap);
}
//4.删除原数据
String deleteSql = "delete from " + msg.getSrcTableName() + " where id = :id ";
srcJdbcTemplate.getNamedParameterJdbcTemplate().update(deleteSql,paramMap);
} }

@ -1,5 +1,7 @@
package cn.estsh.i3plus.core.apiservice.serviceimpl.base; package cn.estsh.i3plus.core.apiservice.serviceimpl.base;
import cn.estsh.i3plus.core.api.iservice.base.ICoreDataSourceService;
import cn.estsh.i3plus.core.api.iservice.base.IDataSeparatorService;
import cn.estsh.i3plus.core.api.iservice.base.ISystemInitService; import cn.estsh.i3plus.core.api.iservice.base.ISystemInitService;
import cn.estsh.i3plus.core.api.iservice.busi.*; import cn.estsh.i3plus.core.api.iservice.busi.*;
import cn.estsh.i3plus.core.apiservice.controller.busi.SysSmsSendRecordController; import cn.estsh.i3plus.core.apiservice.controller.busi.SysSmsSendRecordController;
@ -76,6 +78,12 @@ public class SystemInitService implements ISystemInitService {
@Autowired @Autowired
private ISysSmsSendRecordService sysSmsSendRecordService; private ISysSmsSendRecordService sysSmsSendRecordService;
@Autowired
private IDataSeparatorService dataSeparatorService;
@Autowired
private ICoreDataSourceService iCoreDataSourceService;
@Resource(name= CommonConstWords.IMPP_REDIS_RES) @Resource(name= CommonConstWords.IMPP_REDIS_RES)
private ImppRedis redisRes; private ImppRedis redisRes;
@ -119,6 +127,14 @@ public class SystemInitService implements ISystemInitService {
LOGGER.info("创建 短信同步线程"); LOGGER.info("创建 短信同步线程");
createSmsSyncThread(); createSmsSyncThread();
LOGGER.info("创建 短信同步线程 完成"); LOGGER.info("创建 短信同步线程 完成");
LOGGER.info("【通用数据冷热分离初始化开始...】");
dataSeparatorService.separatorRedisInit();
LOGGER.info("【通用数据冷热分离初始化结束...】");
LOGGER.info("【通用数据冷热分离初始化数据源开始...】");
iCoreDataSourceService.initCoreDataSource();
LOGGER.info("【通用数据冷热分离初始化数据源结束...】");
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
} }

@ -31,8 +31,7 @@ public class SeparatorDataUtil {
* @param destTableName * @param destTableName
* @return * @return
*/ */
public String packSql(Object bean, String destTableName) { public String packInsertSql(HashMap<String, Object> objMap, String destTableName) {
HashMap<String, Object> objMap = getObjMap(bean);
String insertSQL = baseSqlService.packInsertSQL(destTableName, objMap); String insertSQL = baseSqlService.packInsertSQL(destTableName, objMap);
return insertSQL; return insertSQL;
} }

@ -170,11 +170,11 @@
<version>${project.dependency.version}</version> <version>${project.dependency.version}</version>
</dependency> </dependency>
<dependency> <!-- <dependency>-->
<groupId>i3plus.platform</groupId> <!-- <groupId>i3plus.platform</groupId>-->
<artifactId>i3plus-platform-plugin</artifactId> <!-- <artifactId>i3plus-platform-plugin</artifactId>-->
<version>${project.dependency.version}</version> <!-- <version>${project.dependency.version}</version>-->
</dependency> <!-- </dependency>-->
<dependency> <dependency>
<groupId>i3plus.pojo</groupId> <groupId>i3plus.pojo</groupId>
<artifactId>i3plus-pojo-platform</artifactId> <artifactId>i3plus-pojo-platform</artifactId>
@ -191,11 +191,11 @@
<version>${project.dependency.version}</version> <version>${project.dependency.version}</version>
</dependency> </dependency>
<dependency> <!-- <dependency>-->
<groupId>i3plus.pojo</groupId> <!-- <groupId>i3plus.pojo</groupId>-->
<artifactId>i3plus-pojo-mes</artifactId> <!-- <artifactId>i3plus-pojo-mes</artifactId>-->
<version>${project.dependency.version}</version> <!-- <version>${project.dependency.version}</version>-->
</dependency> <!-- </dependency>-->
<!-- 微朝调用 --> <!-- 微朝调用 -->
<dependency> <dependency>

Loading…
Cancel
Save