From b60ed385ec868afda568f2b8b8ac456c10a0b933 Mon Sep 17 00:00:00 2001 From: "wei.peng" Date: Tue, 12 Mar 2019 18:24:44 +0800 Subject: [PATCH] =?UTF-8?q?MQ=20=E7=AB=99=E5=86=85=E4=BF=A1=20=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/api/iservice/busi/ISysMessageService.java | 10 + .../apiservice/mq/MessageLetterQueueReceiver.java | 212 ++++++++++++++++----- .../mq/MessageLetterQueueReceiverHandler.java | 66 +++++++ .../apiservice/mq/MessageMailQueueReceiver.java | 67 +++---- .../serviceimpl/busi/SysMessageService.java | 14 ++ 5 files changed, 285 insertions(+), 84 deletions(-) create mode 100644 modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageLetterQueueReceiverHandler.java diff --git a/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/busi/ISysMessageService.java b/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/busi/ISysMessageService.java index f595bbc..4cf8ad1 100644 --- a/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/busi/ISysMessageService.java +++ b/modules/i3plus-core-api/src/main/java/cn/estsh/i3plus/core/api/iservice/busi/ISysMessageService.java @@ -24,6 +24,9 @@ public interface ISysMessageService { @ApiOperation(value = "添加消息") SysMessage insertSysMessage(SysMessage sysMessage); + @ApiOperation(value = "添加消息") + List insertSysMessage(List sysMessage); + /** * 删除消息 * @param id @@ -77,6 +80,13 @@ public interface ISysMessageService { SysRefUserMessage insertSysRefUserMessage(SysRefUserMessage refUserMessage); /** + * 添加用户消息关系 + * @param refList + */ + @ApiOperation(value = "添加用户消息关系") + List insertSysRefUserMessage(List refList); + + /** * 添加消息并发送 * @param sysMessage * @return diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageLetterQueueReceiver.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageLetterQueueReceiver.java index 6fddb50..5dd8efc 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageLetterQueueReceiver.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageLetterQueueReceiver.java @@ -1,14 +1,18 @@ package cn.estsh.i3plus.core.apiservice.mq; +import cn.estsh.i3plus.core.api.iservice.busi.IPersonnelService; import cn.estsh.i3plus.core.api.iservice.busi.ISysMessageService; import cn.estsh.i3plus.core.api.iservice.busi.ISysUserService; import cn.estsh.i3plus.core.apiservice.websocket.MessageWebSocket; +import cn.estsh.i3plus.platform.common.tool.StringTool; import cn.estsh.i3plus.platform.common.tool.TimeTool; import cn.estsh.i3plus.platform.common.util.PlatformConstWords; +import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil; import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil; import cn.estsh.i3plus.pojo.platform.bean.SysMessage; import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage; import cn.estsh.i3plus.pojo.platform.bean.SysUser; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import org.apache.commons.lang3.StringUtils; @@ -16,11 +20,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import java.io.IOException; +import java.util.ArrayList; import java.util.List; /** @@ -37,78 +43,180 @@ public class MessageLetterQueueReceiver { @Autowired ISysMessageService sysMessageService; + @Autowired ISysUserService sysUserService; + @Autowired + private IPersonnelService personnelService; + + @Autowired + private RabbitTemplate rabbitTemplate; + + private static final ObjectMapper MAPPER = new ObjectMapper(); + /** * 站内信处理队列 - * * @param msg * @param channel * @param message 发送:rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....)); */ @RabbitListener(queues = PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE) public void processImppMessage(SysMessage msg, Channel channel, Message message) { - try { - LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功:{}", msg); + LOGGER.info("【MQ-{}】 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg); + LOGGER.info("【MQ-{}】 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg); + LOGGER.info("【MQ-{}】 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg); + try { msg = sysMessageService.insertSysMessage(msg); - LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功:{}", msg); + if (!StringUtils.isBlank(msg.getMessageReceiversId())) { + String[] userIds = msg.getMessageReceiversId().split(","); - // 收件人信息 - String[] messageReceiver = new String[0]; - String[] receiverName = new String[0]; - if (msg.getMessageReceiversId() != null) { - messageReceiver = msg.getMessageReceiversId().split(","); - receiverName = new String[messageReceiver.length]; - } + if(userIds != null && userIds.length > 0){ + List userList = personnelService.findSysUserByIds(StringTool.getArrayLong(userIds)); - SysRefUserMessage refUserMessage; - SysUser sysUser; - List userMessage; - ObjectMapper mapper = new ObjectMapper(); - - for (int i = 0; i < messageReceiver.length; i++) { - sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i])); - if(sysUser != null) { - receiverName[i] = sysUser.getUserName(); - - refUserMessage = new SysRefUserMessage(); - refUserMessage.setMessageId(msg.getId()); - refUserMessage.setMessageTitleRdd(msg.getMessageTitle()); - refUserMessage.setMessageTypeRdd(msg.getMessageType()); - refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd()); - refUserMessage.setReceiverId(sysUser.getId()); - refUserMessage.setReceiverNameRdd(sysUser.getUserName()); - refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); - refUserMessage.setReceiverTime(TimeTool.getNowTime(true)); - refUserMessage.setIsUrgent(msg.getIsUrgent()); - - sysMessageService.insertSysRefUserMessage(refUserMessage); - - userMessage = sysMessageService.findSysRefUserMessageByUserIdAndStatus(sysUser.getId(), - ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); - MessageWebSocket.sendMessage(sysUser.getUserInfoId(), - mapper.writeValueAsString(userMessage) - ); - } - } + if(userList != null && userList.size() > 0){ + List names = new ArrayList<>(userList.size()); + List insertList = new ArrayList<>(userList.size()); + SysRefUserMessage refUserMessage; - msg.setMessageReceiversNameRdd(StringUtils.join(receiverName, ",")); - sysMessageService.updateSysMessage(msg); + for (SysUser user : userList){ + refUserMessage = new SysRefUserMessage(); + refUserMessage.setMessageId(msg.getId()); + refUserMessage.setMessageTitleRdd(msg.getMessageTitle()); + refUserMessage.setMessageTypeRdd(msg.getMessageType()); + refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd()); + refUserMessage.setReceiverId(user.getId()); + refUserMessage.setReceiverNameRdd(user.getUserName()); + refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); + refUserMessage.setReceiverTime(TimeTool.getNowTime(true)); + refUserMessage.setIsUrgent(msg.getIsUrgent()); - //信息已处理 - channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); - } catch (IOException e) { - LOGGER.error("【MQ-IMPP_MESSAGE_LETTER_QUEUE】处理出错:{}", e.getMessage(), e); - //丢弃这条消息 + names.add(user.getUserName()); + insertList.add(refUserMessage); + } + + List refList = sysMessageService.insertSysRefUserMessage(insertList); + + // 站内信使用 Web Socket 推送 + if(refList != null && refList.size() > 0){ + for (SysRefUserMessage userMessage : refList) { + MessageWebSocket.sendMessage(userMessage.getReceiverId(), MAPPER.writeValueAsString(userMessage)); + } + } + + // 更新冗余信息 + msg.setMessageReceiversNameRdd(StringUtils.join(names,",")); + sysMessageService.updateSysMessage(msg); + } + } + } + }catch (Exception e){ + printErrorMessage(e,Exception.class, msg); + }finally { try { - // 未成功处理,重新发送 - channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); - } catch (IOException e1) { - e1.printStackTrace(); + // 消息处理完成 + LOGGER.info("【MQ-{}】站内信{} DeliveryTag:{} 处理成功", PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, msg,message.getMessageProperties().getDeliveryTag()); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + }catch (IOException e){ + printErrorMessage(e,IOException.class, msg); + }catch (Exception e){ + printErrorMessage(e,Exception.class, msg); } } } + + + private void printErrorMessage(Exception e,Class zlass,SysMessage msg){ + Long time = System.currentTimeMillis(); + LOGGER.error("【MQ-{}】{} 异常代码:{} 消息内容{},处理出错:{}", PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, time, zlass, msg, e.getMessage()); + LOGGER.info("【MQ-{}】{} 异常代码:{} 消息内容{},处理出错:{}", PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE, time, zlass, msg, e.getMessage()); + e.printStackTrace(); + + //TODO MQ发生异常邮件通知 。 + SysMessage sysMessage = new SysMessage(); + sysMessage.setMessageSenderNameRdd("系统管理员"); + sysMessage.setMessageTitle("【IMPP-MQ异常】站内信消息推送失败"); + sysMessage.setMessageContent("推送站内信["+zlass+"]错误异常代码:"+time); + sysMessage.setMessageContentType(ImppEnumUtil.MESSAGE_TYPE_CONTENT.HTML.getValue()); + sysMessage.setMessageSendTime(TimeTool.getNowTime(true)); + sysMessage.setIsSystem(CommonEnumUtil.TRUE_OR_FALSE.TRUE.getValue()); + sysMessage.setMessageReceiverType(ImppEnumUtil.MESSAGE_RECEIVER_TYPE.EXTERNAL.getValue()); + sysMessage.setMessageReceiversNameRdd("yunhao.wang@estsh.com,wei.peng@estsh.com"); + rabbitTemplate.convertAndSend(PlatformConstWords.IMPP_MESSAGE_MAIL_QUEUE,sysMessage); + } + + +// /** +// * 站内信处理队列 +// * +// * @param msg +// * @param channel +// * @param message 发送:rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....)); +// */ +// @RabbitListener(queues = PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE) +// public void processImppMessage(SysMessage msg, Channel channel, Message message) { +// try { +// LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功:{}", msg); +// +// msg = sysMessageService.insertSysMessage(msg); +// +// LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功:{}", msg); +// +// // 收件人信息 +// String[] messageReceiver = new String[0]; +// String[] receiverName = new String[0]; +// if (msg.getMessageReceiversId() != null) { +// messageReceiver = msg.getMessageReceiversId().split(","); +// receiverName = new String[messageReceiver.length]; +// } +// +// SysRefUserMessage refUserMessage; +// SysUser sysUser; +// List userMessage; +// ObjectMapper mapper = new ObjectMapper(); +// +// for (int i = 0; i < messageReceiver.length; i++) { +// sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i])); +// if(sysUser != null) { +// receiverName[i] = sysUser.getUserName(); +// +// refUserMessage = new SysRefUserMessage(); +// refUserMessage.setMessageId(msg.getId()); +// refUserMessage.setMessageTitleRdd(msg.getMessageTitle()); +// refUserMessage.setMessageTypeRdd(msg.getMessageType()); +// refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd()); +// refUserMessage.setReceiverId(sysUser.getId()); +// refUserMessage.setReceiverNameRdd(sysUser.getUserName()); +// refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); +// refUserMessage.setReceiverTime(TimeTool.getNowTime(true)); +// refUserMessage.setIsUrgent(msg.getIsUrgent()); +// +// sysMessageService.insertSysRefUserMessage(refUserMessage); +// +// userMessage = sysMessageService.findSysRefUserMessageByUserIdAndStatus(sysUser.getId(), +// ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); +// MessageWebSocket.sendMessage(sysUser.getUserInfoId(), +// mapper.writeValueAsString(userMessage) +// ); +// } +// } +// +// msg.setMessageReceiversNameRdd(StringUtils.join(receiverName, ",")); +// sysMessageService.updateSysMessage(msg); +// +// //信息已处理 +// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); +// } catch (IOException e) { +// LOGGER.error("【MQ-IMPP_MESSAGE_LETTER_QUEUE】处理出错:{}", e.getMessage(), e); +// //丢弃这条消息 +// try { +// // 未成功处理,重新发送 +// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); +// } catch (IOException e1) { +// e1.printStackTrace(); +// } +// } +// } } diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageLetterQueueReceiverHandler.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageLetterQueueReceiverHandler.java new file mode 100644 index 0000000..dc2c3a3 --- /dev/null +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageLetterQueueReceiverHandler.java @@ -0,0 +1,66 @@ +package cn.estsh.i3plus.core.apiservice.mq; + +import cn.estsh.i3plus.core.api.iservice.busi.IPersonnelService; +import cn.estsh.i3plus.core.api.iservice.busi.ISysMessageService; +import cn.estsh.i3plus.core.api.iservice.busi.ISysUserService; +import cn.estsh.i3plus.core.apiservice.websocket.MessageWebSocket; +import cn.estsh.i3plus.platform.common.tool.StringTool; +import cn.estsh.i3plus.platform.common.tool.TimeTool; +import cn.estsh.i3plus.platform.common.util.PlatformConstWords; +import cn.estsh.i3plus.pojo.base.enumutil.CommonEnumUtil; +import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil; +import cn.estsh.i3plus.pojo.platform.bean.SysMessage; +import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage; +import cn.estsh.i3plus.pojo.platform.bean.SysUser; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.Channel; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * @Description : 站内信队列处理 + * @Reference : + * @Author : yunhao + * @CreateDate : 2018-11-15 22:35 + * @Modify: + **/ +@Component +@RabbitListener(queues = PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK) +public class MessageLetterQueueReceiverHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(MessageLetterQueueReceiverHandler.class); + + @RabbitHandler + public void handleMessage(byte[] message){ + LOGGER.info("【MQ-{}】handleMessage(byte[] message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message); + LOGGER.info("【MQ-{}】handleMessage(byte[] message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message); + LOGGER.info("【MQ-{}】handleMessage(byte[] message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message); + } + + @RabbitHandler + public void handleMessage(String message){ + LOGGER.info("【MQ-{}】handleMessage(String message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message); + LOGGER.info("【MQ-{}】handleMessage(String message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message); + LOGGER.info("【MQ-{}】handleMessage(String message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message); + } + + @RabbitHandler + public void handleMessage(SysMessage message){ + LOGGER.info("【MQ-{}】handleMessage(SysMessage message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message); + LOGGER.info("【MQ-{}】handleMessage(SysMessage message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message); + LOGGER.info("【MQ-{}】handleMessage(SysMessage message) 数据接收成功:{}",PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE_BAK, message); + } + +} diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageMailQueueReceiver.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageMailQueueReceiver.java index 3c3197d..30b2797 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageMailQueueReceiver.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/MessageMailQueueReceiver.java @@ -64,48 +64,51 @@ public class MessageMailQueueReceiver { mailUtil.setContentType(ImppEnumUtil.MESSAGE_TYPE_CONTENT.valueOfDescription(msg.getMessageContentType())); mailUtil.setBody(msg.getMessageContent()); - if(msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.URGENT.getValue()){ - // 判断是否为系统紧急提示 微服注册状态提示 - mailUtil.setTo(sysConfigService.getSysConfigByCode(PlatformConstWords.CONTACT_MAIL).getConfigValue()); - // 次数过于频繁 + if(msg.getMessageReceiverType() != null){ + if(msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.URGENT.getValue()){ + // 判断是否为系统紧急提示 微服注册状态提示 + mailUtil.setTo(sysConfigService.getSysConfigByCode(PlatformConstWords.CONTACT_MAIL).getConfigValue()); + // 次数过于频繁 // mailUtil.send(); - } else if(msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.EXTERNAL.getValue()){ - //判断是否为外部邮件 - mailUtil.setTo(StringUtils.split(msg.getMessageReceiversNameRdd(),",")); - mailUtil.send(); + } else if(msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.EXTERNAL.getValue()){ + //判断是否为外部邮件 + mailUtil.setTo(StringUtils.split(msg.getMessageReceiversNameRdd(),",")); + mailUtil.send(); - } else if (msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.INTERNAL.getValue()) { - // 收件人信息 - String[] messageReceiver = msg.getMessageReceiversId().split(","); - String[] receiverName = new String[messageReceiver.length]; + } else if (msg.getMessageReceiverType().intValue() == ImppEnumUtil.MESSAGE_RECEIVER_TYPE.INTERNAL.getValue()) { + // 收件人信息 + String[] messageReceiver = msg.getMessageReceiversId().split(","); + String[] receiverName = new String[messageReceiver.length]; - SysRefUserMessage refUserMessage; - SysUser sysUser; + SysRefUserMessage refUserMessage; + SysUser sysUser; - for (int i = 0; i < messageReceiver.length; i++) { - sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i])); - receiverName[i] = sysUser.getUserName(); + for (int i = 0; i < messageReceiver.length; i++) { + sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i])); + receiverName[i] = sysUser.getUserName(); - refUserMessage = new SysRefUserMessage(); - refUserMessage.setMessageId(msg.getId()); - refUserMessage.setMessageTitleRdd(msg.getMessageTitle()); - refUserMessage.setMessageTypeRdd(msg.getMessageType()); - refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd()); - refUserMessage.setReceiverId(sysUser.getId()); - refUserMessage.setReceiverNameRdd(sysUser.getUserName()); - refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); - refUserMessage.setReceiverTime(TimeTool.getNowTime(true)); + refUserMessage = new SysRefUserMessage(); + refUserMessage.setMessageId(msg.getId()); + refUserMessage.setMessageTitleRdd(msg.getMessageTitle()); + refUserMessage.setMessageTypeRdd(msg.getMessageType()); + refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd()); + refUserMessage.setReceiverId(sysUser.getId()); + refUserMessage.setReceiverNameRdd(sysUser.getUserName()); + refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); + refUserMessage.setReceiverTime(TimeTool.getNowTime(true)); - sysMessageService.insertSysRefUserMessage(refUserMessage); + sysMessageService.insertSysRefUserMessage(refUserMessage); - mailUtil.setTo(sysUser.getUserEmail()); - mailUtil.send(); - } + mailUtil.setTo(sysUser.getUserEmail()); + mailUtil.send(); + } - msg.setMessageReceiversNameRdd(StringUtils.join(receiverName, ",")); - sysMessageService.updateSysMessage(msg); + msg.setMessageReceiversNameRdd(StringUtils.join(receiverName, ",")); + sysMessageService.updateSysMessage(msg); + } } + //信息已处理 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/busi/SysMessageService.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/busi/SysMessageService.java index a69b8e8..ed8e024 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/busi/SysMessageService.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/serviceimpl/busi/SysMessageService.java @@ -64,6 +64,13 @@ public class SysMessageService implements ISysMessageService { } @Override + @ApiOperation(value = "添加消息") + public List insertSysMessage(List list) { + return sysMessageRDao.saveAll(list); + } + + + @Override @ApiOperation(value = "删除消息") public void deleteSysMessageById(Long id) { LOGGER.info("消息 SYS_MESSAGE id:{}",id); @@ -120,6 +127,12 @@ public class SysMessageService implements ISysMessageService { } @Override + @ApiOperation(value = "添加用户消息关系") + public List insertSysRefUserMessage(List refList) { + return sysRefUserMessageRDao.saveAll(refList); + } + + @Override @ApiOperation(value = "添加消息并发送") public void doSendSysMessage(SysMessage sysMessage) { // 判断消息类型推送到对应的队列 @@ -127,6 +140,7 @@ public class SysMessageService implements ISysMessageService { rabbitTemplate.convertAndSend(PlatformConstWords.IMPP_MESSAGE_MAIL_QUEUE,sysMessage); }else{ rabbitTemplate.convertAndSend(PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE,sysMessage); + rabbitTemplate.convertAndSend(PlatformConstWords.IMPP_MESSAGE_LETTER_QUEUE,sysMessage); } }