|
|
@ -1,14 +1,18 @@
|
|
|
|
package cn.estsh.i3plus.core.apiservice.mq;
|
|
|
|
package cn.estsh.i3plus.core.apiservice.mq;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import cn.estsh.i3plus.core.api.iservice.busi.IPersonnelService;
|
|
|
|
import cn.estsh.i3plus.core.api.iservice.busi.ISysMessageService;
|
|
|
|
import cn.estsh.i3plus.core.api.iservice.busi.ISysMessageService;
|
|
|
|
import cn.estsh.i3plus.core.api.iservice.busi.ISysUserService;
|
|
|
|
import cn.estsh.i3plus.core.api.iservice.busi.ISysUserService;
|
|
|
|
import cn.estsh.i3plus.core.apiservice.websocket.MessageWebSocket;
|
|
|
|
import cn.estsh.i3plus.core.apiservice.websocket.MessageWebSocket;
|
|
|
|
|
|
|
|
import cn.estsh.i3plus.platform.common.tool.StringTool;
|
|
|
|
import cn.estsh.i3plus.platform.common.tool.TimeTool;
|
|
|
|
import cn.estsh.i3plus.platform.common.tool.TimeTool;
|
|
|
|
import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
|
|
|
|
import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
|
|
|
|
|
|
|
|
import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil;
|
|
|
|
import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil;
|
|
|
|
import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil;
|
|
|
|
import cn.estsh.i3plus.pojo.platform.bean.SysMessage;
|
|
|
|
import cn.estsh.i3plus.pojo.platform.bean.SysMessage;
|
|
|
|
import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage;
|
|
|
|
import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage;
|
|
|
|
import cn.estsh.i3plus.pojo.platform.bean.SysUser;
|
|
|
|
import cn.estsh.i3plus.pojo.platform.bean.SysUser;
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
import com.rabbitmq.client.Channel;
|
|
|
|
import com.rabbitmq.client.Channel;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
@ -16,11 +20,13 @@ import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import org.springframework.amqp.core.Message;
|
|
|
|
import org.springframework.amqp.core.Message;
|
|
|
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
|
|
|
|
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.io.IOException;
|
|
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
@ -37,78 +43,180 @@ public class MessageLetterQueueReceiver {
|
|
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
@Autowired
|
|
|
|
ISysMessageService sysMessageService;
|
|
|
|
ISysMessageService sysMessageService;
|
|
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
@Autowired
|
|
|
|
ISysUserService sysUserService;
|
|
|
|
ISysUserService sysUserService;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
|
|
|
private IPersonnelService personnelService;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
|
|
|
private RabbitTemplate rabbitTemplate;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static final ObjectMapper MAPPER = new ObjectMapper();
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 站内信处理队列
|
|
|
|
* 站内信处理队列
|
|
|
|
*
|
|
|
|
|
|
|
|
* @param msg
|
|
|
|
* @param msg
|
|
|
|
* @param channel
|
|
|
|
* @param channel
|
|
|
|
* @param message 发送:rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....));
|
|
|
|
* @param message 发送:rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....));
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
@RabbitListener(queues = PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE)
|
|
|
|
@RabbitListener(queues = PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE)
|
|
|
|
public void processImppMessage(SysMessage msg, Channel channel, Message message) {
|
|
|
|
public void processImppMessage(SysMessage msg, Channel channel, Message message) {
|
|
|
|
try {
|
|
|
|
|
|
|
|
LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功:{}", msg);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LOGGER.info("【MQ-{}】 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg);
|
|
|
|
|
|
|
|
LOGGER.info("【MQ-{}】 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg);
|
|
|
|
|
|
|
|
LOGGER.info("【MQ-{}】 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg);
|
|
|
|
|
|
|
|
try {
|
|
|
|
msg = sysMessageService.insertSysMessage(msg);
|
|
|
|
msg = sysMessageService.insertSysMessage(msg);
|
|
|
|
|
|
|
|
|
|
|
|
LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功:{}", msg);
|
|
|
|
if (!StringUtils.isBlank(msg.getMessageReceiversId())) {
|
|
|
|
|
|
|
|
String[] userIds = msg.getMessageReceiversId().split(",");
|
|
|
|
|
|
|
|
|
|
|
|
// 收件人信息
|
|
|
|
if(userIds != null && userIds.length > 0){
|
|
|
|
String[] messageReceiver = new String[0];
|
|
|
|
List<SysUser> userList = personnelService.findSysUserByIds(StringTool.getArrayLong(userIds));
|
|
|
|
String[] receiverName = new String[0];
|
|
|
|
|
|
|
|
if (msg.getMessageReceiversId() != null) {
|
|
|
|
|
|
|
|
messageReceiver = msg.getMessageReceiversId().split(",");
|
|
|
|
|
|
|
|
receiverName = new String[messageReceiver.length];
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SysRefUserMessage refUserMessage;
|
|
|
|
if(userList != null && userList.size() > 0){
|
|
|
|
SysUser sysUser;
|
|
|
|
List<String> names = new ArrayList<>(userList.size());
|
|
|
|
List<SysRefUserMessage> userMessage;
|
|
|
|
List<SysRefUserMessage> insertList = new ArrayList<>(userList.size());
|
|
|
|
ObjectMapper mapper = new ObjectMapper();
|
|
|
|
SysRefUserMessage refUserMessage;
|
|
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < messageReceiver.length; i++) {
|
|
|
|
|
|
|
|
sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i]));
|
|
|
|
|
|
|
|
if(sysUser != null) {
|
|
|
|
|
|
|
|
receiverName[i] = sysUser.getUserName();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
refUserMessage = new SysRefUserMessage();
|
|
|
|
|
|
|
|
refUserMessage.setMessageId(msg.getId());
|
|
|
|
|
|
|
|
refUserMessage.setMessageTitleRdd(msg.getMessageTitle());
|
|
|
|
|
|
|
|
refUserMessage.setMessageTypeRdd(msg.getMessageType());
|
|
|
|
|
|
|
|
refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd());
|
|
|
|
|
|
|
|
refUserMessage.setReceiverId(sysUser.getId());
|
|
|
|
|
|
|
|
refUserMessage.setReceiverNameRdd(sysUser.getUserName());
|
|
|
|
|
|
|
|
refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
|
|
|
|
|
|
|
|
refUserMessage.setReceiverTime(TimeTool.getNowTime(true));
|
|
|
|
|
|
|
|
refUserMessage.setIsUrgent(msg.getIsUrgent());
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sysMessageService.insertSysRefUserMessage(refUserMessage);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
userMessage = sysMessageService.findSysRefUserMessageByUserIdAndStatus(sysUser.getId(),
|
|
|
|
|
|
|
|
ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
|
|
|
|
|
|
|
|
MessageWebSocket.sendMessage(sysUser.getUserInfoId(),
|
|
|
|
|
|
|
|
mapper.writeValueAsString(userMessage)
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
msg.setMessageReceiversNameRdd(StringUtils.join(receiverName, ","));
|
|
|
|
for (SysUser user : userList){
|
|
|
|
sysMessageService.updateSysMessage(msg);
|
|
|
|
refUserMessage = new SysRefUserMessage();
|
|
|
|
|
|
|
|
refUserMessage.setMessageId(msg.getId());
|
|
|
|
|
|
|
|
refUserMessage.setMessageTitleRdd(msg.getMessageTitle());
|
|
|
|
|
|
|
|
refUserMessage.setMessageTypeRdd(msg.getMessageType());
|
|
|
|
|
|
|
|
refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd());
|
|
|
|
|
|
|
|
refUserMessage.setReceiverId(user.getId());
|
|
|
|
|
|
|
|
refUserMessage.setReceiverNameRdd(user.getUserName());
|
|
|
|
|
|
|
|
refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
|
|
|
|
|
|
|
|
refUserMessage.setReceiverTime(TimeTool.getNowTime(true));
|
|
|
|
|
|
|
|
refUserMessage.setIsUrgent(msg.getIsUrgent());
|
|
|
|
|
|
|
|
|
|
|
|
//信息已处理
|
|
|
|
names.add(user.getUserName());
|
|
|
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
|
|
insertList.add(refUserMessage);
|
|
|
|
} catch (IOException e) {
|
|
|
|
}
|
|
|
|
LOGGER.error("【MQ-IMPP_MESSAGE_LETTER_QUEUE】处理出错:{}", e.getMessage(), e);
|
|
|
|
|
|
|
|
//丢弃这条消息
|
|
|
|
List<SysRefUserMessage> refList = sysMessageService.insertSysRefUserMessage(insertList);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 站内信使用 Web Socket 推送
|
|
|
|
|
|
|
|
if(refList != null && refList.size() > 0){
|
|
|
|
|
|
|
|
for (SysRefUserMessage userMessage : refList) {
|
|
|
|
|
|
|
|
MessageWebSocket.sendMessage(userMessage.getReceiverId(), MAPPER.writeValueAsString(userMessage));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 更新冗余信息
|
|
|
|
|
|
|
|
msg.setMessageReceiversNameRdd(StringUtils.join(names,","));
|
|
|
|
|
|
|
|
sysMessageService.updateSysMessage(msg);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}catch (Exception e){
|
|
|
|
|
|
|
|
printErrorMessage(e,Exception.class, msg);
|
|
|
|
|
|
|
|
}finally {
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
// 未成功处理,重新发送
|
|
|
|
// 消息处理完成
|
|
|
|
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
|
|
|
|
LOGGER.info("【MQ-{}】站内信{} DeliveryTag:{} 处理成功", PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg,message.getMessageProperties().getDeliveryTag());
|
|
|
|
} catch (IOException e1) {
|
|
|
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
|
|
e1.printStackTrace();
|
|
|
|
}catch (IOException e){
|
|
|
|
|
|
|
|
printErrorMessage(e,IOException.class, msg);
|
|
|
|
|
|
|
|
}catch (Exception e){
|
|
|
|
|
|
|
|
printErrorMessage(e,Exception.class, msg);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void printErrorMessage(Exception e,Class zlass,SysMessage msg){
|
|
|
|
|
|
|
|
Long time = System.currentTimeMillis();
|
|
|
|
|
|
|
|
LOGGER.error("【MQ-{}】{} 异常代码:{} 消息内容{},处理出错:{}", PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, time, zlass, msg, e.getMessage());
|
|
|
|
|
|
|
|
LOGGER.info("【MQ-{}】{} 异常代码:{} 消息内容{},处理出错:{}", PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, time, zlass, msg, e.getMessage());
|
|
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//TODO MQ发生异常邮件通知 。
|
|
|
|
|
|
|
|
SysMessage sysMessage = new SysMessage();
|
|
|
|
|
|
|
|
sysMessage.setMessageSenderNameRdd("系统管理员");
|
|
|
|
|
|
|
|
sysMessage.setMessageTitle("【IMPP-MQ异常】站内信消息推送失败");
|
|
|
|
|
|
|
|
sysMessage.setMessageContent("推送站内信["+zlass+"]错误异常代码:"+time);
|
|
|
|
|
|
|
|
sysMessage.setMessageContentType(ImppEnumUtil.MESSAGE_TYPE_CONTENT.HTML.getValue());
|
|
|
|
|
|
|
|
sysMessage.setMessageSendTime(TimeTool.getNowTime(true));
|
|
|
|
|
|
|
|
sysMessage.setIsSystem(CommonEnumUtil.TRUE_OR_FALSE.TRUE.getValue());
|
|
|
|
|
|
|
|
sysMessage.setMessageReceiverType(ImppEnumUtil.MESSAGE_RECEIVER_TYPE.EXTERNAL.getValue());
|
|
|
|
|
|
|
|
sysMessage.setMessageReceiversNameRdd("yunhao.wang@estsh.com,wei.peng@estsh.com");
|
|
|
|
|
|
|
|
rabbitTemplate.convertAndSend(PlatformConstWords.IMPP_MESSAGE_MAIL_QUEUE,sysMessage);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// /**
|
|
|
|
|
|
|
|
// * 站内信处理队列
|
|
|
|
|
|
|
|
// *
|
|
|
|
|
|
|
|
// * @param msg
|
|
|
|
|
|
|
|
// * @param channel
|
|
|
|
|
|
|
|
// * @param message 发送:rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....));
|
|
|
|
|
|
|
|
// */
|
|
|
|
|
|
|
|
// @RabbitListener(queues = PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE)
|
|
|
|
|
|
|
|
// public void processImppMessage(SysMessage msg, Channel channel, Message message) {
|
|
|
|
|
|
|
|
// try {
|
|
|
|
|
|
|
|
// LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功:{}", msg);
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// msg = sysMessageService.insertSysMessage(msg);
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功:{}", msg);
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// // 收件人信息
|
|
|
|
|
|
|
|
// String[] messageReceiver = new String[0];
|
|
|
|
|
|
|
|
// String[] receiverName = new String[0];
|
|
|
|
|
|
|
|
// if (msg.getMessageReceiversId() != null) {
|
|
|
|
|
|
|
|
// messageReceiver = msg.getMessageReceiversId().split(",");
|
|
|
|
|
|
|
|
// receiverName = new String[messageReceiver.length];
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// SysRefUserMessage refUserMessage;
|
|
|
|
|
|
|
|
// SysUser sysUser;
|
|
|
|
|
|
|
|
// List<SysRefUserMessage> userMessage;
|
|
|
|
|
|
|
|
// ObjectMapper mapper = new ObjectMapper();
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// for (int i = 0; i < messageReceiver.length; i++) {
|
|
|
|
|
|
|
|
// sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i]));
|
|
|
|
|
|
|
|
// if(sysUser != null) {
|
|
|
|
|
|
|
|
// receiverName[i] = sysUser.getUserName();
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// refUserMessage = new SysRefUserMessage();
|
|
|
|
|
|
|
|
// refUserMessage.setMessageId(msg.getId());
|
|
|
|
|
|
|
|
// refUserMessage.setMessageTitleRdd(msg.getMessageTitle());
|
|
|
|
|
|
|
|
// refUserMessage.setMessageTypeRdd(msg.getMessageType());
|
|
|
|
|
|
|
|
// refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd());
|
|
|
|
|
|
|
|
// refUserMessage.setReceiverId(sysUser.getId());
|
|
|
|
|
|
|
|
// refUserMessage.setReceiverNameRdd(sysUser.getUserName());
|
|
|
|
|
|
|
|
// refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
|
|
|
|
|
|
|
|
// refUserMessage.setReceiverTime(TimeTool.getNowTime(true));
|
|
|
|
|
|
|
|
// refUserMessage.setIsUrgent(msg.getIsUrgent());
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// sysMessageService.insertSysRefUserMessage(refUserMessage);
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// userMessage = sysMessageService.findSysRefUserMessageByUserIdAndStatus(sysUser.getId(),
|
|
|
|
|
|
|
|
// ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
|
|
|
|
|
|
|
|
// MessageWebSocket.sendMessage(sysUser.getUserInfoId(),
|
|
|
|
|
|
|
|
// mapper.writeValueAsString(userMessage)
|
|
|
|
|
|
|
|
// );
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// msg.setMessageReceiversNameRdd(StringUtils.join(receiverName, ","));
|
|
|
|
|
|
|
|
// sysMessageService.updateSysMessage(msg);
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// //信息已处理
|
|
|
|
|
|
|
|
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
|
|
|
|
|
|
// } catch (IOException e) {
|
|
|
|
|
|
|
|
// LOGGER.error("【MQ-IMPP_MESSAGE_LETTER_QUEUE】处理出错:{}", e.getMessage(), e);
|
|
|
|
|
|
|
|
// //丢弃这条消息
|
|
|
|
|
|
|
|
// try {
|
|
|
|
|
|
|
|
// // 未成功处理,重新发送
|
|
|
|
|
|
|
|
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
|
|
|
|
|
|
|
|
// } catch (IOException e1) {
|
|
|
|
|
|
|
|
// e1.printStackTrace();
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
// }
|
|
|
|
}
|
|
|
|
}
|
|
|
|