yun-zuoyi
yunhao.wang 6 years ago
commit a8f23b301b

@ -24,6 +24,9 @@ public interface ISysMessageService {
@ApiOperation(value = "添加消息")
SysMessage insertSysMessage(SysMessage sysMessage);
@ApiOperation(value = "添加消息")
List<SysMessage> insertSysMessage(List<SysMessage> sysMessage);
/**
*
* @param id
@ -77,6 +80,13 @@ public interface ISysMessageService {
SysRefUserMessage insertSysRefUserMessage(SysRefUserMessage refUserMessage);
/**
*
* @param refList
*/
@ApiOperation(value = "添加用户消息关系")
List<SysRefUserMessage> insertSysRefUserMessage(List<SysRefUserMessage> refList);
/**
*
* @param sysMessage
* @return

@ -1,14 +1,18 @@
package cn.estsh.i3plus.core.apiservice.mq;
import cn.estsh.i3plus.core.api.iservice.busi.IPersonnelService;
import cn.estsh.i3plus.core.api.iservice.busi.ISysMessageService;
import cn.estsh.i3plus.core.api.iservice.busi.ISysUserService;
import cn.estsh.i3plus.core.apiservice.websocket.MessageWebSocket;
import cn.estsh.i3plus.platform.common.tool.StringTool;
import cn.estsh.i3plus.platform.common.tool.TimeTool;
import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil;
import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil;
import cn.estsh.i3plus.pojo.platform.bean.SysMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysUser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import org.apache.commons.lang3.StringUtils;
@ -16,11 +20,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
@ -37,78 +43,180 @@ public class MessageLetterQueueReceiver {
@Autowired
ISysMessageService sysMessageService;
@Autowired
ISysUserService sysUserService;
@Autowired
private IPersonnelService personnelService;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final ObjectMapper MAPPER = new ObjectMapper();
/**
*
*
* @param msg
* @param channel
* @param message rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....));
*/
@RabbitListener(queues = PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE)
public void processImppMessage(SysMessage msg, Channel channel, Message message) {
try {
LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功{}", msg);
LOGGER.info("【MQ-{}】 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg);
LOGGER.info("【MQ-{}】 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg);
LOGGER.info("【MQ-{}】 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg);
try {
msg = sysMessageService.insertSysMessage(msg);
LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功{}", msg);
if (!StringUtils.isBlank(msg.getMessageReceiversId())) {
String[] userIds = msg.getMessageReceiversId().split(",");
// 收件人信息
String[] messageReceiver = new String[0];
String[] receiverName = new String[0];
if (msg.getMessageReceiversId() != null) {
messageReceiver = msg.getMessageReceiversId().split(",");
receiverName = new String[messageReceiver.length];
}
if(userIds != null && userIds.length > 0){
List<SysUser> userList = personnelService.findSysUserByIds(StringTool.getArrayLong(userIds));
SysRefUserMessage refUserMessage;
SysUser sysUser;
List<SysRefUserMessage> userMessage;
ObjectMapper mapper = new ObjectMapper();
for (int i = 0; i < messageReceiver.length; i++) {
sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i]));
if(sysUser != null) {
receiverName[i] = sysUser.getUserName();
refUserMessage = new SysRefUserMessage();
refUserMessage.setMessageId(msg.getId());
refUserMessage.setMessageTitleRdd(msg.getMessageTitle());
refUserMessage.setMessageTypeRdd(msg.getMessageType());
refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd());
refUserMessage.setReceiverId(sysUser.getId());
refUserMessage.setReceiverNameRdd(sysUser.getUserName());
refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
refUserMessage.setReceiverTime(TimeTool.getNowTime(true));
refUserMessage.setIsUrgent(msg.getIsUrgent());
sysMessageService.insertSysRefUserMessage(refUserMessage);
userMessage = sysMessageService.findSysRefUserMessageByUserIdAndStatus(sysUser.getId(),
ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
MessageWebSocket.sendMessage(sysUser.getUserInfoId(),
mapper.writeValueAsString(userMessage)
);
}
}
if(userList != null && userList.size() > 0){
List<String> names = new ArrayList<>(userList.size());
List<SysRefUserMessage> insertList = new ArrayList<>(userList.size());
SysRefUserMessage refUserMessage;
msg.setMessageReceiversNameRdd(StringUtils.join(receiverName, ","));
sysMessageService.updateSysMessage(msg);
for (SysUser user : userList){
refUserMessage = new SysRefUserMessage();
refUserMessage.setMessageId(msg.getId());
refUserMessage.setMessageTitleRdd(msg.getMessageTitle());
refUserMessage.setMessageTypeRdd(msg.getMessageType());
refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd());
refUserMessage.setReceiverId(user.getId());
refUserMessage.setReceiverNameRdd(user.getUserName());
refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
refUserMessage.setReceiverTime(TimeTool.getNowTime(true));
refUserMessage.setIsUrgent(msg.getIsUrgent());
//信息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
LOGGER.error("【MQ-IMPP_MESSAGE_LETTER_QUEUE】处理出错{}", e.getMessage(), e);
//丢弃这条消息
names.add(user.getUserName());
insertList.add(refUserMessage);
}
List<SysRefUserMessage> refList = sysMessageService.insertSysRefUserMessage(insertList);
// 站内信使用 Web Socket 推送
if(refList != null && refList.size() > 0){
for (SysRefUserMessage userMessage : refList) {
MessageWebSocket.sendMessage(userMessage.getReceiverId(), MAPPER.writeValueAsString(userMessage));
}
}
// 更新冗余信息
msg.setMessageReceiversNameRdd(StringUtils.join(names,","));
sysMessageService.updateSysMessage(msg);
}
}
}
}catch (Exception e){
printErrorMessage(e,Exception.class, msg);
}finally {
try {
// 未成功处理,重新发送
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException e1) {
e1.printStackTrace();
// 消息处理完成
LOGGER.info("【MQ-{}】站内信{} DeliveryTag:{} 处理成功", PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg,message.getMessageProperties().getDeliveryTag());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (IOException e){
printErrorMessage(e,IOException.class, msg);
}catch (Exception e){
printErrorMessage(e,Exception.class, msg);
}
}
}
private void printErrorMessage(Exception e,Class zlass,SysMessage msg){
Long time = System.currentTimeMillis();
LOGGER.error("【MQ-{}】{} 异常代码:{} 消息内容{},处理出错:{}", PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, time, zlass, msg, e.getMessage());
LOGGER.info("【MQ-{}】{} 异常代码:{} 消息内容{},处理出错:{}", PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, time, zlass, msg, e.getMessage());
e.printStackTrace();
//TODO MQ发生异常邮件通知 。
SysMessage sysMessage = new SysMessage();
sysMessage.setMessageSenderNameRdd("系统管理员");
sysMessage.setMessageTitle("【IMPP-MQ异常】站内信消息推送失败");
sysMessage.setMessageContent("推送站内信["+zlass+"]错误异常代码:"+time);
sysMessage.setMessageContentType(ImppEnumUtil.MESSAGE_TYPE_CONTENT.HTML.getValue());
sysMessage.setMessageSendTime(TimeTool.getNowTime(true));
sysMessage.setIsSystem(CommonEnumUtil.TRUE_OR_FALSE.TRUE.getValue());
sysMessage.setMessageReceiverType(ImppEnumUtil.MESSAGE_RECEIVER_TYPE.EXTERNAL.getValue());
sysMessage.setMessageReceiversNameRdd("yunhao.wang@estsh.com,wei.peng@estsh.com");
rabbitTemplate.convertAndSend(PlatformConstWords.IMPP_MESSAGE_MAIL_QUEUE,sysMessage);
}
// /**
// * 站内信处理队列
// *
// * @param msg
// * @param channel
// * @param message 发送rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....));
// */
// @RabbitListener(queues = PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE)
// public void processImppMessage(SysMessage msg, Channel channel, Message message) {
// try {
// LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功{}", msg);
//
// msg = sysMessageService.insertSysMessage(msg);
//
// LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功{}", msg);
//
// // 收件人信息
// String[] messageReceiver = new String[0];
// String[] receiverName = new String[0];
// if (msg.getMessageReceiversId() != null) {
// messageReceiver = msg.getMessageReceiversId().split(",");
// receiverName = new String[messageReceiver.length];
// }
//
// SysRefUserMessage refUserMessage;
// SysUser sysUser;
// List<SysRefUserMessage> userMessage;
// ObjectMapper mapper = new ObjectMapper();
//
// for (int i = 0; i < messageReceiver.length; i++) {
// sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i]));
// if(sysUser != null) {
// receiverName[i] = sysUser.getUserName();
//
// refUserMessage = new SysRefUserMessage();
// refUserMessage.setMessageId(msg.getId());
// refUserMessage.setMessageTitleRdd(msg.getMessageTitle());
// refUserMessage.setMessageTypeRdd(msg.getMessageType());
// refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd());
// refUserMessage.setReceiverId(sysUser.getId());
// refUserMessage.setReceiverNameRdd(sysUser.getUserName());
// refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
// refUserMessage.setReceiverTime(TimeTool.getNowTime(true));
// refUserMessage.setIsUrgent(msg.getIsUrgent());
//
// sysMessageService.insertSysRefUserMessage(refUserMessage);
//
// userMessage = sysMessageService.findSysRefUserMessageByUserIdAndStatus(sysUser.getId(),
// ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
// MessageWebSocket.sendMessage(sysUser.getUserInfoId(),
// mapper.writeValueAsString(userMessage)
// );
// }
// }
//
// msg.setMessageReceiversNameRdd(StringUtils.join(receiverName, ","));
// sysMessageService.updateSysMessage(msg);
//
// //信息已处理
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// } catch (IOException e) {
// LOGGER.error("【MQ-IMPP_MESSAGE_LETTER_QUEUE】处理出错{}", e.getMessage(), e);
// //丢弃这条消息
// try {
// // 未成功处理,重新发送
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// } catch (IOException e1) {
// e1.printStackTrace();
// }
// }
// }
}

@ -0,0 +1,66 @@
package cn.estsh.i3plus.core.apiservice.mq;
import cn.estsh.i3plus.core.api.iservice.busi.IPersonnelService;
import cn.estsh.i3plus.core.api.iservice.busi.ISysMessageService;
import cn.estsh.i3plus.core.api.iservice.busi.ISysUserService;
import cn.estsh.i3plus.core.apiservice.websocket.MessageWebSocket;
import cn.estsh.i3plus.platform.common.tool.StringTool;
import cn.estsh.i3plus.platform.common.tool.TimeTool;
import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil;
import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil;
import cn.estsh.i3plus.pojo.platform.bean.SysMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysUser;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @Description :
* @Reference :
* @Author : yunhao
* @CreateDate : 2018-11-15 22:35
* @Modify:
**/
@Component
@RabbitListener(queues = PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK)
public class MessageLetterQueueReceiverHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageLetterQueueReceiverHandler.class);
@RabbitHandler
public void handleMessage(byte[] message){
LOGGER.info("【MQ-{}】handleMessage(byte[] message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message);
LOGGER.info("【MQ-{}】handleMessage(byte[] message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message);
LOGGER.info("【MQ-{}】handleMessage(byte[] message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message);
}
@RabbitHandler
public void handleMessage(String message){
LOGGER.info("【MQ-{}】handleMessage(String message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message);
LOGGER.info("【MQ-{}】handleMessage(String message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message);
LOGGER.info("【MQ-{}】handleMessage(String message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message);
}
@RabbitHandler
public void handleMessage(SysMessage message){
LOGGER.info("【MQ-{}】handleMessage(SysMessage message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message);
LOGGER.info("【MQ-{}】handleMessage(SysMessage message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message);
LOGGER.info("【MQ-{}】handleMessage(SysMessage message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message);
}
}

@ -64,48 +64,51 @@ public class MessageMailQueueReceiver {
mailUtil.setContentType(ImppEnumUtil.MESSAGE_TYPE_CONTENT.valueOfDescription(msg.getMessageContentType()));
mailUtil.setBody(msg.getMessageContent());
if(msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.URGENT.getValue()){
// 判断是否为系统紧急提示 微服注册状态提示
mailUtil.setTo(sysConfigService.getSysConfigByCode(PlatformConstWords.CONTACT_MAIL).getConfigValue());
// 次数过于频繁
if(msg.getMessageReceiverType() != null){
if(msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.URGENT.getValue()){
// 判断是否为系统紧急提示 微服注册状态提示
mailUtil.setTo(sysConfigService.getSysConfigByCode(PlatformConstWords.CONTACT_MAIL).getConfigValue());
// 次数过于频繁
// mailUtil.send();
} else if(msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.EXTERNAL.getValue()){
//判断是否为外部邮件
mailUtil.setTo(StringUtils.split(msg.getMessageReceiversNameRdd(),","));
mailUtil.send();
} else if(msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.EXTERNAL.getValue()){
//判断是否为外部邮件
mailUtil.setTo(StringUtils.split(msg.getMessageReceiversNameRdd(),","));
mailUtil.send();
} else if (msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.INTERNAL.getValue()) {
// 收件人信息
String[] messageReceiver = msg.getMessageReceiversId().split(",");
String[] receiverName = new String[messageReceiver.length];
} else if (msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.INTERNAL.getValue()) {
// 收件人信息
String[] messageReceiver = msg.getMessageReceiversId().split(",");
String[] receiverName = new String[messageReceiver.length];
SysRefUserMessage refUserMessage;
SysUser sysUser;
SysRefUserMessage refUserMessage;
SysUser sysUser;
for (int i = 0; i < messageReceiver.length; i++) {
sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i]));
receiverName[i] = sysUser.getUserName();
for (int i = 0; i < messageReceiver.length; i++) {
sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i]));
receiverName[i] = sysUser.getUserName();
refUserMessage = new SysRefUserMessage();
refUserMessage.setMessageId(msg.getId());
refUserMessage.setMessageTitleRdd(msg.getMessageTitle());
refUserMessage.setMessageTypeRdd(msg.getMessageType());
refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd());
refUserMessage.setReceiverId(sysUser.getId());
refUserMessage.setReceiverNameRdd(sysUser.getUserName());
refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
refUserMessage.setReceiverTime(TimeTool.getNowTime(true));
refUserMessage = new SysRefUserMessage();
refUserMessage.setMessageId(msg.getId());
refUserMessage.setMessageTitleRdd(msg.getMessageTitle());
refUserMessage.setMessageTypeRdd(msg.getMessageType());
refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd());
refUserMessage.setReceiverId(sysUser.getId());
refUserMessage.setReceiverNameRdd(sysUser.getUserName());
refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
refUserMessage.setReceiverTime(TimeTool.getNowTime(true));
sysMessageService.insertSysRefUserMessage(refUserMessage);
sysMessageService.insertSysRefUserMessage(refUserMessage);
mailUtil.setTo(sysUser.getUserEmail());
mailUtil.send();
}
mailUtil.setTo(sysUser.getUserEmail());
mailUtil.send();
}
msg.setMessageReceiversNameRdd(StringUtils.join(receiverName, ","));
sysMessageService.updateSysMessage(msg);
msg.setMessageReceiversNameRdd(StringUtils.join(receiverName, ","));
sysMessageService.updateSysMessage(msg);
}
}
//信息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
@ -113,7 +116,10 @@ public class MessageMailQueueReceiver {
//丢弃这条消息
try {
// 未成功处理,重新发送
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
//todo 此处对邮件持久化,通过参数设置,是否还需要重复重新发送
// 改为接收成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e1) {
e1.printStackTrace();
}

@ -64,6 +64,13 @@ public class SysMessageService implements ISysMessageService {
}
@Override
@ApiOperation(value = "添加消息")
public List<SysMessage> insertSysMessage(List<SysMessage> list) {
return sysMessageRDao.saveAll(list);
}
@Override
@ApiOperation(value = "删除消息")
public void deleteSysMessageById(Long id) {
LOGGER.info("消息 SYS_MESSAGE id:{}",id);
@ -120,6 +127,12 @@ public class SysMessageService implements ISysMessageService {
}
@Override
@ApiOperation(value = "添加用户消息关系")
public List<SysRefUserMessage> insertSysRefUserMessage(List<SysRefUserMessage> refList) {
return sysRefUserMessageRDao.saveAll(refList);
}
@Override
@ApiOperation(value = "添加消息并发送")
public void doSendSysMessage(SysMessage sysMessage) {
// 判断消息类型推送到对应的队列
@ -127,6 +140,7 @@ public class SysMessageService implements ISysMessageService {
rabbitTemplate.convertAndSend(PlatformConstWords.IMPP_MESSAGE_MAIL_QUEUE,sysMessage);
}else{
rabbitTemplate.convertAndSend(PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE,sysMessage);
rabbitTemplate.convertAndSend(PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE,sysMessage);
}
}

Loading…
Cancel
Save