Merge branch 'dev' into test

yun-zuoyi
wynne1005 4 years ago
commit 74de5329e6

@ -11,6 +11,7 @@ import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
@ -38,12 +39,13 @@ public class CommonQueueReceive {
//绑定交换
exchange = @Exchange(type = ExchangeTypes.FANOUT, value = QueueConstWords.QUEUE_EXCHANGE_COMMON))
})
public void commonQueue(Object object, Channel channel, Message message) {
public Boolean commonQueue(Object object, Channel channel, Message message) {
try {
LOGGER.info("【{}】数据接收成功:{}", QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.QUEUE_COMMON, object);
//信息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return true;
} catch (IOException e) {
LOGGER.error("【{}】处理出错,丢弃:{}", QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.QUEUE_COMMON, e.getMessage(), e);
//丢弃这条消息
@ -54,6 +56,7 @@ public class CommonQueueReceive {
e1.printStackTrace();
}
}
return false;
}
/**
@ -70,12 +73,13 @@ public class CommonQueueReceive {
exchange = @Exchange(type = ExchangeTypes.DIRECT, value = QueueConstWords.QUEUE_EXCHANGE_DIRECT),
key = QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.DIRECT_EXCHANGE_KEY_ROLE)
})
public void directQueue(Object object, Channel channel, Message message) {
public Boolean directQueue(Object object, Channel channel, Message message) {
try {
LOGGER.info("【{}】数据接收成功:{}", QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.QUEUE_COMMON, object);
//信息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return true;
} catch (IOException e) {
LOGGER.error("【{}】处理出错,丢弃:{}", QueueConstWords.CORE_QUEUE_PREFIX + QueueConstWords.QUEUE_COMMON, e.getMessage(), e);
//丢弃这条消息
@ -86,5 +90,6 @@ public class CommonQueueReceive {
e1.printStackTrace();
}
}
return false;
}
}

@ -30,20 +30,22 @@ public class I3CoreQueueReceiver {
/**
*
* @param msg
*
* @param msg
* @param channel
* @param message
* rabbitTemplate.convertAndSend(I3CoreQueueConfig.QUEUE_IMPP_MESSAGE, new SysMessage(....));
* rabbitTemplate.convertAndSend(I3CoreQueueConfig.QUEUE_IMPP_MESSAGE, new SysMessage(....));
*/
@RabbitListener(queues = QUEUE_IMPP_MESSAGE)
public void processImppMessage(SysMessage msg, Channel channel, Message message) {
public Boolean processImppMessage(SysMessage msg, Channel channel, Message message) {
try {
LOGGER.debug("【MQ-QUEUE_IMPP_MESSAGE】数据接收成功{}",msg);
LOGGER.debug("【MQ-QUEUE_IMPP_MESSAGE】数据接收成功{}", msg);
//信息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return true;
} catch (IOException e) {
LOGGER.error("【MQ-QUEUE_IMPP_MESSAGE】处理出错{}",e.getMessage(),e);
LOGGER.error("【MQ-QUEUE_IMPP_MESSAGE】处理出错{}", e.getMessage(), e);
//丢弃这条消息
try {
// 未成功处理,重新发送
@ -52,6 +54,7 @@ public class I3CoreQueueReceiver {
e1.printStackTrace();
}
}
return false;
}
}

@ -60,13 +60,13 @@ public class MessageLetterQueueReceiver {
/**
*
*
* @param data
* @param data
* @param channel
* @param message
* rabbitTemplate.convertAndSend(I3CoreQueueConfig.QUEUE_IMPP_MESSAGE, new SysMessage(....));
* rabbitTemplate.convertAndSend(I3CoreQueueConfig.QUEUE_IMPP_MESSAGE, new SysMessage(....));
*/
@RabbitListener(queues = PlatformConstWords.QUEUE_IMPP_MESSAGE_LETTER)
public void processImppMessage(String data, Channel channel, Message message) {
public Boolean processImppMessage(String data, Channel channel, Message message) {
LOGGER.debug("【MQ-{}】 数据接收成功:{}", PlatformConstWords.QUEUE_IMPP_MESSAGE_LETTER, data);
SysMessage msg = JsonUtilTool.decode(data, SysMessage.class);
try {
@ -116,6 +116,7 @@ public class MessageLetterQueueReceiver {
LOGGER.info("【MQ-{}】站内信{} DeliveryTag:{} 处理成功", PlatformConstWords.QUEUE_IMPP_MESSAGE_LETTER,
msg, message.getMessageProperties().getDeliveryTag());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return true;
} catch (IOException e) {
printErrorMessage(e, Exception.class, data);
try {
@ -128,6 +129,7 @@ public class MessageLetterQueueReceiver {
e1.printStackTrace();
}
}
return false;
}
private void printErrorMessage(Exception e, Class zlass, String msg) {

@ -10,7 +10,11 @@ import cn.estsh.i3plus.platform.common.tool.JsonUtilTool;
import cn.estsh.i3plus.platform.common.tool.TimeTool;
import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil;
import cn.estsh.i3plus.pojo.platform.bean.*;
import cn.estsh.i3plus.pojo.platform.bean.SysFile;
import cn.estsh.i3plus.pojo.platform.bean.SysFileAttach;
import cn.estsh.i3plus.pojo.platform.bean.SysMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysUser;
import cn.estsh.impp.framework.boot.fileservice.ImppFileService;
import cn.estsh.impp.framework.boot.util.RedisCacheTool;
import com.alibaba.fastjson.JSON;
@ -69,7 +73,7 @@ public class MessageMailQueueReceiver {
* @param message
*/
@RabbitListener(queues = PlatformConstWords.QUEUE_IMPP_MESSAGE_MAIL)
public void processImppMail(String data, Channel channel, Message message) {
public Boolean processImppMail(String data, Channel channel, Message message) {
SysMessage msg = JsonUtilTool.decode(data, SysMessage.class);
long startTime = System.currentTimeMillis();
try {
@ -199,6 +203,7 @@ public class MessageMailQueueReceiver {
} finally {
LOGGER.info("[Core Email] Email Id:{} , 耗时{}, Message Info :{}", msg.getId(), System.currentTimeMillis() - startTime, JSON.toJSONString(msg));
}
return true;
}
/**

@ -54,17 +54,17 @@ public class MessageSWebNoticeQueueReceiver {
/**
* SWEB
*
* @param data
* @param channel
* @param message
* @param data
* @param channel
* @param message
*/
@RabbitListener(queues = PlatformConstWords.QUEUE_SWEB_NOTICE)
public void processImppMail(String data, Channel channel, Message message) {
public Boolean processImppMail(String data, Channel channel, Message message) {
LOGGER.debug("【MQ-{}】 数据接收成功:{}", PlatformConstWords.QUEUE_SWEB_NOTICE, data);
// 添加消息
SysMessage msg = JsonUtilTool.decode(data, SysMessage.class);
try {
if (msg!= null &&StringUtils.isBlank(msg.getCreateUser())) {
if (msg != null && StringUtils.isBlank(msg.getCreateUser())) {
ConvertBean.serviceModelInitialize(msg, msg.getMessageSenderNameRdd());
}
@ -141,6 +141,7 @@ public class MessageSWebNoticeQueueReceiver {
LOGGER.info("【MQ-{}】站内信{} DeliveryTag:{} 处理成功", PlatformConstWords.QUEUE_IMPP_MESSAGE_LETTER,
msg, message.getMessageProperties().getDeliveryTag());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return true;
} catch (Exception e) {
e.printStackTrace();
try {
@ -153,6 +154,7 @@ public class MessageSWebNoticeQueueReceiver {
e1.printStackTrace();
}
}
return false;
}
}

@ -75,7 +75,7 @@ public class MessageWorkWeChatQueueReceiver {
* @param message
*/
@RabbitListener(queues = PlatformConstWords.QUEUE_IMPP_WORK_WECHAT_MSG)
public void processWorkWeChatMsg(String data, Channel channel, Message message) {
public Boolean processWorkWeChatMsg(String data, Channel channel, Message message) {
SysMessage msg = JsonUtilTool.decode(data, SysMessage.class);
Long startTime = System.currentTimeMillis();
try {
@ -138,6 +138,7 @@ public class MessageWorkWeChatQueueReceiver {
//信息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return true;
} catch (Exception e) {
LOGGER.error("【MQ-QUEUE_IMPP_WORK_WECHAT_MSG】处理出错{}", e.getMessage(), e);
//丢弃这条消息
@ -151,6 +152,7 @@ public class MessageWorkWeChatQueueReceiver {
LOGGER.info("[Core Work WeChat] Message Id:{} , 耗时{}, Message Info :{}", msg.getId(),
System.currentTimeMillis() - startTime, JSON.toJSONString(msg));
}
return false;
}
/**
@ -161,13 +163,13 @@ public class MessageWorkWeChatQueueReceiver {
* @param message
*/
@RabbitListener(queues = PlatformConstWords.QUEUE_IMPP_WORK_WECHAT_MSG_CALLBACK)
public void processWorkWeChatMsgCallback(String data, Channel channel, Message message) {
public Boolean processWorkWeChatMsgCallback(String data, Channel channel, Message message) {
MsgTaskCardEvent msgTaskCardEvent = JsonUtilTool.decode(data, MsgTaskCardEvent.class);
Long startTime = System.currentTimeMillis();
try {
if (!WorkWeChatEnumUtil.MSG_EVENT.TASK_CARD_CLICK.getValue().equals(msgTaskCardEvent.getEvent())) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return;
return true;
}
Long msgId = Long.parseLong(msgTaskCardEvent.getTaskId());
@ -204,5 +206,6 @@ public class MessageWorkWeChatQueueReceiver {
LOGGER.info("[Core Work WeChat] Email Id:{} , 耗时{}, Message Info :{}", msgTaskCardEvent.getTaskId(),
System.currentTimeMillis() - startTime, JSON.toJSONString(msgTaskCardEvent));
}
return true;
}
}

@ -72,7 +72,7 @@ public class PojoVersionQueueReceiver {
}
@RabbitListener(queues = QUEUE_IMPP_POJO_VERSION)
public void processImppMessage(Channel channel, Message message) {
public Boolean processImppMessage(Channel channel, Message message) {
boolean isNack = false;
try {
byte[] messageBody = message.getBody();
@ -179,6 +179,11 @@ public class PojoVersionQueueReceiver {
LOGGER.error("Pojo Version MQ ACK Error Message :{}", e.getMessage());
}
}
if (isNack) {
return false;
} else {
return true;
}
}
/**

@ -56,17 +56,17 @@ public class ScheduleQueueReceiver {
/**
*
*
* @param data
* @param channel
* @param message
* @param data
* @param channel
* @param message
*/
@RabbitListener(queues = PlatformConstWords.QUEUE_IMPP_SCHEDULE)
public void processSchedule(String data, Channel channel, Message message) {
public Boolean processSchedule(String data, Channel channel, Message message) {
try {
SysLogTaskTime logTaskTime = JsonUtilTool.decode(data, SysLogTaskTime.class);
LOGGER.debug("【MQ-QUEUE_IMPP_SCHEDULE】数据接收成功{}", logTaskTime);
// 跟新最后执行时间 及 任务状态
if(logTaskTime != null){
if (logTaskTime != null) {
SysTaskPlan taskPlan = sysTaskPlanService.getSysTaskPlanByNameAndGroup(logTaskTime.getName(), logTaskTime.getGroupName());
if (taskPlan != null) {
taskPlan.setLastRunDateTime(logTaskTime.getCreateDatetime());
@ -128,6 +128,7 @@ public class ScheduleQueueReceiver {
//信息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return true;
} catch (IOException e) {
LOGGER.error("【MQ-IMPP_SCHEDULE_QUEUE】处理出错{}", e.getMessage(), e);
//丢弃这条消息
@ -138,5 +139,6 @@ public class ScheduleQueueReceiver {
e1.printStackTrace();
}
}
return false;
}
}

@ -36,44 +36,45 @@ public class SoftUpdateRecordQueueReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SoftUpdateRecordQueueReceiver.class);
@Resource(name= CommonConstWords.IMPP_REDIS_RES)
@Resource(name = CommonConstWords.IMPP_REDIS_RES)
private ImppRedis redisRes;
@Resource
private ISysSoftUpdateRecordService sysSoftUpdateRecordService;
@Bean
public Queue queueImppSoftVersion(){
public Queue queueImppSoftVersion() {
return new Queue(PlatformConstWords.QUEUE_IMPP_SOFT_VERSION);
}
/**
*
*
* @param data
* @param channel
* @param message
* @param data
* @param channel
* @param message
*/
@RabbitListener(queues = PlatformConstWords.QUEUE_IMPP_SOFT_VERSION)
public void processUpdateRecord(String data, Channel channel, Message message) {
public Boolean processUpdateRecord(String data, Channel channel, Message message) {
try {
LOGGER.debug("【MQ-QUEUE_IMPP_SOFT_VERSION】数据接收成功{}", data);
SysSoftUpdateRecord sysSoftUpdateRecord = JsonUtilTool.decode(data, SysSoftUpdateRecord.class);
if(sysSoftUpdateRecordService.checkSysSoftUpdateRecordOnly(sysSoftUpdateRecord)){
if (sysSoftUpdateRecordService.checkSysSoftUpdateRecordOnly(sysSoftUpdateRecord)) {
sysSoftUpdateRecord.transformUpdateDetail();
sysSoftUpdateRecordService.insert(sysSoftUpdateRecord);
}
String softVersionKey = SOFT_VERSION + ":" + CommonEnumUtil.SOFT_TYPE.valueOfCode(sysSoftUpdateRecord.getSystemSoftType());
String currentVerStr = (String) redisRes.getObject(softVersionKey);
if (ImppSoftVersionService.isUpdateVersion(currentVerStr, sysSoftUpdateRecord.getSystemVersion())) {
if (ImppSoftVersionService.isUpdateVersion(currentVerStr, sysSoftUpdateRecord.getSystemVersion())) {
redisRes.putObject(softVersionKey, sysSoftUpdateRecord.getSystemVersion(), -1);
}
//信息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return true;
} catch (IOException e) {
LOGGER.error("【MQ-QUEUE_IMPP_SOFT_VERSION】处理出错{}", e.getMessage(), e);
//丢弃这条消息
@ -84,5 +85,7 @@ public class SoftUpdateRecordQueueReceiver {
e1.printStackTrace();
}
}
return false;
}
}

@ -41,11 +41,12 @@ public class SysLocalResourceQueue {
* <br/>
* <br/>
* </per>
*
* @param channel
* @param message
*/
@RabbitListener(queues = QUEUE_IMPP_RESOURCE)
public void processImppMessage(String missResourceStr, Channel channel, Message message) {
public Boolean processImppMessage(String missResourceStr, Channel channel, Message message) {
try {
Map<String, String> missResource = JsonUtilTool.decode(missResourceStr, Map.class);
if (missResource != null && !missResource.isEmpty()) {
@ -72,6 +73,7 @@ public class SysLocalResourceQueue {
e.printStackTrace();
}
}
return true;
}
}

@ -46,17 +46,18 @@ public class SysOrderNoRecordQueueReceiver {
}
@RabbitListener(queues = QUEUE_IMPP_ORDER_NO_RECORD)
public void processImppMessage(String orderNoListStr, Channel channel, Message message) {
public Boolean processImppMessage(String orderNoListStr, Channel channel, Message message) {
try {
List<SysOrderNoRule> orderNoRuleList = JsonUtilTool.decode(orderNoListStr, new TypeReference<List<SysOrderNoRule>>(){});
List<SysOrderNoRule> orderNoRuleList = JsonUtilTool.decode(orderNoListStr, new TypeReference<List<SysOrderNoRule>>() {
});
if (CollectionUtils.isNotEmpty(orderNoRuleList)) {
List<SysOrderNoRecord> orderNoRecordList = new ArrayList<>();
orderNoRuleList.forEach(ds -> orderNoRecordList.add(new SysOrderNoRecord(ds.getOrderNoRuleCode(), ds.getOrderNo())));
orderNoRecordService.saveBatch(orderNoRecordList);
}
}catch (Exception e){
} catch (Exception e) {
LOGGER.error("单号生成记录异常", e);
}finally {
} finally {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
LOGGER.info("[MQ ACK] Tag :{} Completed successfully",message.getMessageProperties().getDeliveryTag());
@ -65,6 +66,7 @@ public class SysOrderNoRecordQueueReceiver {
e.printStackTrace();
}
}
return true;
}
}

@ -4,7 +4,6 @@ import cn.estsh.i3plus.core.api.iservice.base.ISynchronizedService;
import cn.estsh.i3plus.core.api.iservice.busi.ISysOrderNoRuleService;
import cn.estsh.i3plus.core.apiservice.util.OrderNoMakeUtil;
import cn.estsh.i3plus.platform.common.exception.ImppExceptionEnum;
import cn.estsh.i3plus.platform.common.tool.JsonUtilTool;
import cn.estsh.i3plus.platform.common.util.CommonConstWords;
import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil;
@ -106,11 +105,12 @@ public class SynchronizedService implements ISynchronizedService {
// 生成单号更缓存
List<SysOrderNoRule> orderNoRuleList = sysOrderNoRuleService.doGetSysOrderNoRuleByNum(codeRole, orderNoTemplate, num);
try {
rabbitTemplate.convertAndSend(PlatformConstWords.QUEUE_IMPP_ORDER_NO_RECORD, JsonUtilTool.encode(orderNoRuleList));
} catch (Exception e) {
LOGGER.error("单号记录推送异常", e);
}
// FIXME 松下推送量太大并且该记录无意义 先注释
// try {
// rabbitTemplate.convertAndSend(PlatformConstWords.QUEUE_IMPP_ORDER_NO_RECORD, JsonUtilTool.encode(orderNoRuleList));
// } catch (Exception e) {
// LOGGER.error("单号记录推送异常", e);
// }
// 生成单号更缓存
return orderNoRuleList;

Loading…
Cancel
Save