通用数据冷热分离代码,并规范化test分支开发时的依赖,全部依赖1.0-TEST-SNAPSHOT

yun-zuoyi
castle.zang 3 years ago
parent b85a9fd55a
commit 0711496499

@ -0,0 +1,58 @@
package cn.estsh.i3plus.core.api.iservice.base;
import cn.estsh.i3plus.pojo.base.bean.ListPager;
import cn.estsh.i3plus.pojo.base.common.Pager;
import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule;
/**
* @Description :
* @Reference :
* @Author : Castle
* @CreateDate : 2021/12/3 13:41
* @Modify:
**/
public interface IDataSeparatorService {
/**
*
* @param baseRepository
* @param msg
*/
void doSeparate(BaseRepository baseRepository, DataSeparatorMessage msg);
/**
*
* @param msg
* @return
*/
boolean addRule (DataSeparatorRule msg);
/**
* redis
* @param id
* @return
*/
boolean deleteRule(Long id);
/**
* redis
* @param msg
* @return
*/
boolean update(DataSeparatorRule msg);
/**
*
* @param dataSeparatorRule
* @param pager
* @return
*/
ListPager<DataSeparatorRule> query(DataSeparatorRule dataSeparatorRule, Pager pager);
/**
* redis
*/
void separatorRedisInit();
}

@ -0,0 +1,34 @@
package cn.estsh.i3plus.core.apiservice.configuration;
import cn.estsh.i3plus.core.api.iservice.base.IDataSeparatorService;
import cn.estsh.impp.framework.boot.init.IAppStartSystemInit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Description :
* @Reference :
* @Author : Castle
* @CreateDate : 2021/12/7 15:35
* @Modify:
**/
@Slf4j
@Component
public class DataSeparatorInit implements IAppStartSystemInit {
@Autowired
private IDataSeparatorService dataSeparatorService;
@Override
public void systemInit() {
try {
log.info("【通用数据冷热分离初始化开始...】");
dataSeparatorService.separatorRedisInit();
log.info("【通用数据冷热分离初始化结束...】");
} catch (Exception e) {
log.error("【通用数据冷热分离初始化失败】,失败原因:{}", e.getMessage());
}
}
}

@ -0,0 +1,83 @@
package cn.estsh.i3plus.core.apiservice.controller.base;
import cn.estsh.i3plus.core.api.iservice.base.IDataSeparatorService;
import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
import cn.estsh.i3plus.pojo.base.bean.ListPager;
import cn.estsh.i3plus.pojo.base.common.Pager;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule;
import cn.estsh.impp.framework.boot.exception.ImppBusiException;
import cn.estsh.impp.framework.boot.exception.ImppExceptionBuilder;
import cn.estsh.impp.framework.boot.util.ResultBean;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @Description :
* @Reference :
* @Author : Castle
* @CreateDate : 2021/12/3 16:38
* @Modify:
**/
@Api(tags = "通用冷热数据分离管理")
@RestController
@RequestMapping(PlatformConstWords.BASE_URL + "/separator")
public class DataSeparatorController {
@Autowired
private IDataSeparatorService dataSeparatorService;
@GetMapping("/query")
@ApiOperation(notes = "根据条件查询数据分离规则", value = "根据条件查询数据分离规则")
public ResultBean query(DataSeparatorRule dataSeparatorRule, Pager pager) {
try {
ListPager<DataSeparatorRule> result = dataSeparatorService.query(dataSeparatorRule, pager);
return ResultBean.success("false").setListPager(result).setMsg("查询成功");
} catch (ImppBusiException busExcep) {
return ResultBean.fail(busExcep).build();
} catch (Exception e) {
return ImppExceptionBuilder.newInstance().buildExceptionResult(e);
}
}
@PostMapping("/add")
@ApiOperation(notes = "添加数据分离规则", value = "添加数据分离规则")
public ResultBean add(@RequestBody DataSeparatorRule dataSeparatorRule) {
try {
dataSeparatorService.addRule(dataSeparatorRule);
return ResultBean.success("添加成功");
} catch (ImppBusiException busExcep) {
return ResultBean.fail(busExcep).build();
} catch (Exception e) {
return ImppExceptionBuilder.newInstance().buildExceptionResult(e);
}
}
@DeleteMapping("/delete/{id}")
@ApiOperation(notes = "逻辑删除数据分离规则", value = "逻辑删除数据分离规则")
public ResultBean delete(@PathVariable Long id) {
try {
dataSeparatorService.deleteRule(id);
return ResultBean.success("删除成功");
} catch (ImppBusiException busExcep) {
return ResultBean.fail(busExcep).build();
} catch (Exception e) {
return ImppExceptionBuilder.newInstance().buildExceptionResult(e);
}
}
@PutMapping("/update")
@ApiOperation(notes = "更新数据分离规则", value = "更新数据分离规则")
public ResultBean update(@RequestBody DataSeparatorRule dataSeparatorRule) {
try {
dataSeparatorService.update(dataSeparatorRule);
return ResultBean.success("修改成功");
} catch (ImppBusiException busExcep) {
return ResultBean.fail(busExcep).build();
} catch (Exception e) {
return ImppExceptionBuilder.newInstance().buildExceptionResult(e);
}
}
}

@ -1,20 +1,34 @@
package cn.estsh.i3plus.core.apiservice.mq;
import cn.estsh.i3plus.platform.common.util.CommonConstWords;
import cn.estsh.impp.framework.boot.util.ImppRedis;
import cn.estsh.i3plus.core.apiservice.serviceimpl.base.DataSeparatorServiceImpl;
import cn.estsh.i3plus.platform.common.tool.JsonUtilTool;
import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository;
import cn.estsh.i3plus.pojo.base.jpa.daoimpl.BaseRepositoryImpl;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage;
import cn.estsh.impp.framework.boot.util.SpringContextsUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.aop.framework.AdvisedSupport;
import org.springframework.aop.framework.AopProxy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import static cn.estsh.i3plus.platform.common.util.PlatformConstWords.QUEUE_IMPP_DATA_SEPARATOR;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @Description :
@ -24,13 +38,15 @@ import static cn.estsh.i3plus.platform.common.util.PlatformConstWords.QUEUE_IMPP
* @Modify:
**/
@Configuration
@ConditionalOnExpression(" '${impp.mq.queue.data.separator:false}' == 'true' ")
@ConditionalOnExpression(" '${impp.aspect.data.separation:false}' == 'true' ")
public class DataSeparatorQueueReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(DataSeparatorQueueReceiver.class);
@Resource(name = CommonConstWords.IMPP_REDIS_RES)
private ImppRedis redisRes;
private static final Map<String, BaseRepositoryImpl> REPOSITORY_HASH_MAP = new HashMap<>();
@Resource
private DataSeparatorServiceImpl dataSeparatorService;
/**
* QUEUE_SWEB_NOTICE
@ -39,14 +55,73 @@ public class DataSeparatorQueueReceiver {
* @throws Exception
*/
@Bean
public Queue getQueueSwebNoticeQueue() {
return new Queue(QUEUE_IMPP_DATA_SEPARATOR);
public Queue getDataSeparatorQueue() {
return new Queue(PlatformConstWords.QUEUE_IMPP_DATA_SEPARATOR);
}
@RabbitListener(queues = QUEUE_IMPP_DATA_SEPARATOR)
public Boolean consumer(Channel channel, Message message){
byte[] msg = message.getBody();
@RabbitListener(queues = PlatformConstWords.QUEUE_IMPP_DATA_SEPARATOR,concurrency = "5")
public Boolean consumer(String data, Channel channel, Message message) {
DataSeparatorMessage msg = JsonUtilTool.decode(data, DataSeparatorMessage.class);
String refClass = msg.getRefClass();
BaseRepository repository;
//找到项目中bean的repository
if (REPOSITORY_HASH_MAP.containsKey(refClass)) {
repository = REPOSITORY_HASH_MAP.get(refClass);
} else {
Map<String, BaseRepository> beansRepositories = SpringContextsUtil.getBeansOfType(BaseRepository.class);
List<BaseRepository> collect = beansRepositories.values().stream().filter(item -> {
String clazz = null;
try {
//第一层代理
Field h = item.getClass().getSuperclass().getDeclaredField("h");
h.setAccessible(true);
AopProxy o = (AopProxy) h.get(item);
Field advised = o.getClass().getDeclaredField("advised");
advised.setAccessible(true);
Object target = ((AdvisedSupport) advised.get(o)).getTargetSource().getTarget();
//第二层代理
Field h1 = target.getClass().getSuperclass().getDeclaredField("h");
h1.setAccessible(true);
AopProxy aopProxy = (AopProxy) h1.get(target);
Field advised1 = aopProxy.getClass().getDeclaredField("advised");
advised1.setAccessible(true);
Object target1 = ((AdvisedSupport) advised1.get(aopProxy)).getTargetSource().getTarget();
Field persistentClass = target1.getClass().getDeclaredField("persistentClass");
persistentClass.setAccessible(true);
clazz = persistentClass.get(target1).toString();
} catch (NoSuchFieldException | IllegalAccessException e) {
return false;
} catch (Exception e) {
LOGGER.error("通用数据分离,获取repository异常 :{}", e.getMessage());
}
return clazz.substring(clazz.lastIndexOf(".") + 1).equals(refClass.substring(refClass.lastIndexOf(".") + 1));
}).collect(Collectors.toList());
if (!collect.isEmpty()) {
repository = collect.get(0);
} else {
LOGGER.error("{}:找不到对应的RepositoryBean!", refClass);
return false;
}
}
try {
dataSeparatorService.doSeparate(repository, msg);
} catch (Exception e) {
LOGGER.error("通用冷热数据分离失败,失败原因:{}", e.getMessage());
} finally {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
LOGGER.error("通用冷热数据分离失败返回ACK失败:{}", e.getMessage());
}
}
return true;
}
}

@ -0,0 +1,117 @@
package cn.estsh.i3plus.core.apiservice.serviceimpl.base;
import cn.estsh.i3plus.core.api.iservice.base.IDataSeparatorService;
import cn.estsh.i3plus.core.apiservice.util.strategy.IDataSeparatorStrategy;
import cn.estsh.i3plus.platform.common.convert.ConvertBean;
import cn.estsh.i3plus.platform.common.util.CommonConstWords;
import cn.estsh.i3plus.pojo.base.bean.DdlPackBean;
import cn.estsh.i3plus.pojo.base.bean.ListPager;
import cn.estsh.i3plus.pojo.base.common.Pager;
import cn.estsh.i3plus.pojo.base.common.PagerHelper;
import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil;
import cn.estsh.i3plus.pojo.base.jpa.dao.BaseRepository;
import cn.estsh.i3plus.pojo.base.tool.DdlPreparedPack;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorRule;
import cn.estsh.i3plus.pojo.platform.repository.DataSeparatorRepository;
import cn.estsh.i3plus.pojo.platform.sqlpack.CoreHqlPack;
import cn.estsh.impp.framework.boot.auth.AuthUtil;
import cn.estsh.impp.framework.boot.util.ImppRedis;
import cn.estsh.impp.framework.boot.util.SpringContextsUtil;
import com.google.common.base.Strings;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
/**
* @Description :
* @Reference :
* @Author : Castle
* @CreateDate : 2021/12/3 13:44
* @Modify:
**/
@Service
public class DataSeparatorServiceImpl implements IDataSeparatorService {
@Resource(name = CommonConstWords.IMPP_REDIS_RES)
private ImppRedis redisRes;
@Resource
private DataSeparatorRepository dataSeparatorRepository;
@Override
public void doSeparate(BaseRepository baseRepository, DataSeparatorMessage msg) {
//获取数据
Long id = msg.getId();
Object bean = baseRepository.getById(id);
//获取策略
String strategy = ImppEnumUtil.DATA_SEPARATOR_STRATEGY.codeOfStrategyName(msg.getClassification());
IDataSeparatorStrategy separator = (IDataSeparatorStrategy) SpringContextsUtil.getBean(strategy);
//迁移数据
separator.execute(bean, msg);
//删除原数据
baseRepository.deleteById(id);
}
@Override
public boolean addRule(DataSeparatorRule msg) {
ConvertBean.saveOrUpdate(msg, AuthUtil.getSessionUser().getUserName());
DataSeparatorRule rule = dataSeparatorRepository.insert(msg);
String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + rule.getRefBeanName().toUpperCase();
redisRes.putObject(redisKey, rule, -1);
return true;
}
@Override
public boolean deleteRule(Long id) {
DataSeparatorRule rule = dataSeparatorRepository.getById(id);
String redisKey = CommonConstWords.RES_DATA_SEPARATOR + rule.getRefBeanName().toUpperCase();
dataSeparatorRepository.deleteWeaklyById(id, AuthUtil.getSessionUser().getUserName());
redisRes.deleteKey(redisKey);
return true;
}
@Override
public boolean update(DataSeparatorRule msg) {
ConvertBean.saveOrUpdate(msg, AuthUtil.getSessionUser().getUserName());
dataSeparatorRepository.update(msg);
String redisKey = CommonConstWords.RES_DATA_SEPARATOR + ":" + msg.getRefBeanName().toUpperCase();
redisRes.putObject(redisKey, msg, -1);
return true;
}
@Override
public ListPager<DataSeparatorRule> query(DataSeparatorRule rule, Pager pager) {
DdlPackBean ddlPackBean = DdlPackBean.getDdlPackBean(rule.getOrganizeCode());
if (Strings.isNullOrEmpty(rule.getClassification())) {
DdlPreparedPack.getStringEqualPack(rule.getClassification(), "classification", ddlPackBean);
}
if (Strings.isNullOrEmpty(rule.getRefBeanName())) {
DdlPreparedPack.getStringLeftLikerPack(rule.getRefBeanName(), "refBeanName", ddlPackBean);
}
if (Strings.isNullOrEmpty(rule.getDatabaseName())) {
DdlPreparedPack.getStringLeftLikerPack(rule.getDatabaseName(), "databaseName", ddlPackBean);
}
if (Strings.isNullOrEmpty(rule.getRuleColumn())) {
DdlPreparedPack.getStringLeftLikerPack(rule.getRuleColumn(), "ruleColumn", ddlPackBean);
}
pager = PagerHelper.getPager(pager, dataSeparatorRepository.findByHqlWhereCount(ddlPackBean));
return new ListPager(dataSeparatorRepository.findByHqlPage(ddlPackBean, pager), pager);
}
@Override
public void separatorRedisInit() {
//启动加载所有的数据到redis中
List<DataSeparatorRule> list = dataSeparatorRepository.list();
list.forEach(item -> {
String refBeanName = item.getRefBeanName().toUpperCase();
String redisKey = CommonConstWords.RES_DATA_SEPARATOR + refBeanName;
redisRes.putObject(redisKey, item, -1);
});
}
}

@ -0,0 +1,103 @@
package cn.estsh.i3plus.core.apiservice.serviceimpl.base;
import cn.estsh.i3plus.core.apiservice.util.strategy.IDataSeparatorStrategy;
import cn.estsh.i3plus.platform.plugin.datasource.DynamicDataSourceProxy;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.persistence.Column;
import java.lang.reflect.Field;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Description :
* @Reference :
* @Author : Castle
* @CreateDate : 2021/12/3 14:02
* @Modify:
**/
@Component
@Slf4j
public class DataSeparatorStrategyMysql implements IDataSeparatorStrategy {
/**
* url+key
*/
public static final Map<String, DynamicDataSourceProxy> DYNAMIC_DATA_SOURCE_PROXY_MAP = new HashMap<>();
@Override
public void execute(Object bean, DataSeparatorMessage msg) {
//获取动态数据源
DynamicDataSourceProxy dynamicDataSource = getDynamicDataSource(msg);
//封装Map<columnName,value>
HashMap<String, Object> objMap = getObjMap(bean);
//执行sql
String insertSQL = dynamicDataSource.packInsertSQL(msg.getDestTableName(), objMap);
try {
dynamicDataSource.execute(insertSQL);
} catch (SQLException e) {
log.error("执行sql:{}失败,失败原因:{}", insertSQL, e.getMessage());
}
}
/**
*
*
* @param msg
* @return
*/
private DynamicDataSourceProxy getDynamicDataSource(DataSeparatorMessage msg) {
DynamicDataSourceProxy dataSourceProxy;
String destUrl = msg.getDestUrl();
String dataBaseName = msg.getDatabaseName();
String mapKey = destUrl + ":" + dataBaseName;
if (!DYNAMIC_DATA_SOURCE_PROXY_MAP.containsKey(mapKey)) {
String url = "jdbc:mysql://" + destUrl + "/" + dataBaseName + "?autoReconnect=true&useSSL=false&characterEncoding=utf-8";
dataSourceProxy = DynamicDataSourceProxy.initDataSourceFactory("com.mysql.jdbc.Driver",url,msg.getUserName(),msg.getPassword());
DYNAMIC_DATA_SOURCE_PROXY_MAP.put(mapKey, dataSourceProxy);
}
dataSourceProxy = DYNAMIC_DATA_SOURCE_PROXY_MAP.get(mapKey);
return dataSourceProxy;
}
/**
* k-v
*
* @param bean
* @return
*/
private HashMap<String, Object> getObjMap(Object bean) {
Field[] declaredFields = bean.getClass().getDeclaredFields();
Field[] parentFields = bean.getClass().getSuperclass().getDeclaredFields();
HashMap<String, Object> objMap = new HashMap<>(declaredFields.length);
List<Field[]> list = new ArrayList<>();
list.add(declaredFields);
list.add(parentFields);
list.forEach(item -> {
for (Field field : item) {
field.setAccessible(true);
if (null != field.getAnnotation(Column.class)) {
Column annotation = field.getAnnotation(Column.class);
String columnName = annotation.name();
try {
Object value = field.get(bean);
objMap.put(columnName, value);
} catch (IllegalAccessException e) {
log.error("冷热分离Mysql封装数据错误:{}!", columnName);
}
}
}
});
return objMap;
}
}

@ -0,0 +1,20 @@
package cn.estsh.i3plus.core.apiservice.util.strategy;
import cn.estsh.i3plus.pojo.platform.bean.DataSeparatorMessage;
/**
* @Description :
* @Reference :
* @Author : Castle
* @CreateDate : 2021/12/3 13:59
* @Modify:
**/
public interface IDataSeparatorStrategy {
/**
*
* @param bean
* @param msg
*/
void execute(Object bean, DataSeparatorMessage msg);
}
Loading…
Cancel
Save